Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1159,12 +1159,19 @@ def with_all_tasks_removed(dag):


class DagBagTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.empty_dir = mkdtemp()

@classmethod
def tearDownClass(cls):
os.rmdir(cls.empty_dir)

def test_get_existing_dag(self):
"""
test that were're able to parse some example DAGs and retrieve them
"""
dagbag = models.DagBag(include_examples=True)
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=True)

some_expected_dag_ids = ["example_bash_operator",
"example_branch_operator"]
Expand All @@ -1181,7 +1188,7 @@ def test_get_non_existing_dag(self):
"""
test that retrieving a non existing dag id returns None without crashing
"""
dagbag = models.DagBag(include_examples=True)
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)

non_existing_dag_id = "non_existing_dag_id"
self.assertIsNone(dagbag.get_dag(non_existing_dag_id))
Expand All @@ -1194,7 +1201,7 @@ def test_process_file_that_contains_multi_bytes_char(self):
f.write('\u3042'.encode('utf8')) # write multi-byte char (hiragana)
f.flush()

dagbag = models.DagBag(include_examples=True)
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
self.assertEqual([], dagbag.process_file(f.name))

def test_zip_skip_log(self):
Expand All @@ -1216,7 +1223,7 @@ def test_zip(self):
"""
test the loading of a DAG within a zip file that includes dependencies
"""
dagbag = models.DagBag()
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip"))
self.assertTrue(dagbag.get_dag("test_zip_dag"))

Expand All @@ -1226,7 +1233,7 @@ def test_process_file_cron_validity_check(self):
as schedule interval can be identified
"""
invalid_dag_files = ["test_invalid_cron.py", "test_zip_invalid_cron.zip"]
dagbag = models.DagBag(dag_folder=mkdtemp())
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)

self.assertEqual(len(dagbag.import_errors), 0)
for d in invalid_dag_files:
Expand Down Expand Up @@ -1290,7 +1297,7 @@ def process_dag(self, create_dag):
f.write(source.encode('utf8'))
f.flush()

dagbag = models.DagBag(include_examples=False)
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)
found_dags = dagbag.process_file(f.name)
return dagbag, found_dags, f.name

Expand Down Expand Up @@ -1601,7 +1608,7 @@ def test_process_file_with_none(self):
"""
test that process_file can handle Nones
"""
dagbag = models.DagBag(include_examples=True)
dagbag = models.DagBag(dag_folder=self.empty_dir, include_examples=False)

self.assertEqual([], dagbag.process_file(None))

Expand Down