From c937467e53f6739238930c265f4dfbda5c5799e8 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Fri, 21 Sep 2018 14:55:33 +0200 Subject: [PATCH] [AIRFLOW-1390] Update Alembic to 0.9 --- setup.py | 2 +- tests/hooks/test_postgres_hook.py | 27 +++++++++++++-------- tests/operators/operators.py | 7 ++++++ tests/utils/test_db.py | 39 +++++++++++++++++++++++-------- 4 files changed, 54 insertions(+), 21 deletions(-) diff --git a/setup.py b/setup.py index aecc21817025c..8de62cae589cf 100644 --- a/setup.py +++ b/setup.py @@ -292,7 +292,7 @@ def do_setup(): zip_safe=False, scripts=['airflow/bin/airflow'], install_requires=[ - 'alembic>=0.8.3, <0.9', + 'alembic>=0.9, <1.0', 'bleach~=2.1.3', 'configparser>=3.5.0, <3.6.0', 'croniter>=0.3.17, <0.4', diff --git a/tests/hooks/test_postgres_hook.py b/tests/hooks/test_postgres_hook.py index 0520239e06ee8..3e71f60f5b8d0 100644 --- a/tests/hooks/test_postgres_hook.py +++ b/tests/hooks/test_postgres_hook.py @@ -28,6 +28,10 @@ class TestPostgresHook(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(TestPostgresHook, self).__init__(*args, **kwargs) + self.table = "test_postgres_hook_table" + def setUp(self): super(TestPostgresHook, self).setUp() @@ -43,6 +47,13 @@ def get_conn(self): self.db_hook = UnitTestPostgresHook() + def tearDown(self): + super(TestPostgresHook, self).tearDown() + + with PostgresHook().get_conn() as conn: + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS {}".format(self.table)) + def test_copy_expert(self): m = mock.mock_open(read_data='{"some": "json"}') with mock.patch('airflow.hooks.postgres_hook.open', m): @@ -61,40 +72,36 @@ def test_copy_expert(self): def test_bulk_load(self): hook = PostgresHook() - table = "t" input_data = ["foo", "bar", "baz"] with hook.get_conn() as conn: with conn.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS {}".format(table)) - cur.execute("CREATE TABLE {} (c VARCHAR)".format(table)) + cur.execute("CREATE TABLE {} (c VARCHAR)".format(self.table)) conn.commit() with NamedTemporaryFile() as f: f.write("\n".join(input_data).encode("utf-8")) f.flush() - hook.bulk_load(table, f.name) + hook.bulk_load(self.table, f.name) - cur.execute("SELECT * FROM {}".format(table)) + cur.execute("SELECT * FROM {}".format(self.table)) results = [row[0] for row in cur.fetchall()] self.assertEqual(sorted(input_data), sorted(results)) def test_bulk_dump(self): hook = PostgresHook() - table = "t" input_data = ["foo", "bar", "baz"] with hook.get_conn() as conn: with conn.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS {}".format(table)) - cur.execute("CREATE TABLE {} (c VARCHAR)".format(table)) + cur.execute("CREATE TABLE {} (c VARCHAR)".format(self.table)) values = ",".join("('{}')".format(data) for data in input_data) - cur.execute("INSERT INTO {} VALUES {}".format(table, values)) + cur.execute("INSERT INTO {} VALUES {}".format(self.table, values)) conn.commit() with NamedTemporaryFile() as f: - hook.bulk_dump(table, f.name) + hook.bulk_dump(self.table, f.name) f.seek(0) results = [line.rstrip().decode("utf-8") for line in f.readlines()] diff --git a/tests/operators/operators.py b/tests/operators/operators.py index 82870ac3eefbd..0048a96837712 100644 --- a/tests/operators/operators.py +++ b/tests/operators/operators.py @@ -47,6 +47,13 @@ def setUp(self): dag = DAG(TEST_DAG_ID, default_args=args) self.dag = dag + def tearDown(self): + from airflow.hooks.mysql_hook import MySqlHook + drop_tables = {'test_mysql_to_mysql', 'test_airflow'} + with MySqlHook().get_conn() as conn: + for table in drop_tables: + conn.execute("DROP TABLE IF EXISTS {}".format(table)) + def test_mysql_operator_test(self): sql = """ CREATE TABLE IF NOT EXISTS test_airflow ( diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py index 5fdc40b5255be..718e2ae1066fe 100644 --- a/tests/utils/test_db.py +++ b/tests/utils/test_db.py @@ -45,20 +45,19 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self): lambda t: (t[0] == 'remove_column' and t[2] == 'users' and t[3].name == 'password'), - # ignore tables created by other tests - lambda t: (t[0] == 'remove_table' and - t[1].name == 't'), - lambda t: (t[0] == 'remove_table' and - t[1].name == 'test_airflow'), - lambda t: (t[0] == 'remove_table' and - t[1].name == 'test_postgres_to_postgres'), - lambda t: (t[0] == 'remove_table' and - t[1].name == 'test_mysql_to_mysql'), + # ignore tables created by celery lambda t: (t[0] == 'remove_table' and t[1].name == 'celery_taskmeta'), lambda t: (t[0] == 'remove_table' and t[1].name == 'celery_tasksetmeta'), + + # ignore indices created by celery + lambda t: (t[0] == 'remove_index' and + t[1].name == 'task_id'), + lambda t: (t[0] == 'remove_index' and + t[1].name == 'taskset_id'), + # Ignore all the fab tables lambda t: (t[0] == 'remove_table' and t[1].name == 'ab_permission'), @@ -76,6 +75,23 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self): t[1].name == 'ab_user'), lambda t: (t[0] == 'remove_table' and t[1].name == 'ab_view_menu'), + + # Ignore all the fab indices + lambda t: (t[0] == 'remove_index' and + t[1].name == 'permission_id'), + lambda t: (t[0] == 'remove_index' and + t[1].name == 'name'), + lambda t: (t[0] == 'remove_index' and + t[1].name == 'user_id'), + lambda t: (t[0] == 'remove_index' and + t[1].name == 'username'), + lambda t: (t[0] == 'remove_index' and + t[1].name == 'field_string'), + lambda t: (t[0] == 'remove_index' and + t[1].name == 'email'), + lambda t: (t[0] == 'remove_index' and + t[1].name == 'permission_view_id'), + # from test_security unit test lambda t: (t[0] == 'remove_table' and t[1].name == 'some_model'), @@ -83,4 +99,7 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self): for ignore in ignores: diff = [d for d in diff if not ignore(d)] - self.assertFalse(diff, 'Database schema and SQLAlchemy model are not in sync') + self.assertFalse( + diff, + 'Database schema and SQLAlchemy model are not in sync: ' + str(diff) + )