diff --git a/airflow/models.py b/airflow/models.py index 94e18794d6935..ddf309456722b 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -337,7 +337,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): return found_dags mods = [] - if not zipfile.is_zipfile(filepath): + is_zipfile = zipfile.is_zipfile(filepath) + if not is_zipfile: if safe_mode and os.path.isfile(filepath): with open(filepath, 'rb') as f: content = f.read() @@ -409,7 +410,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True): if isinstance(dag, DAG): if not dag.full_filepath: dag.full_filepath = filepath - if dag.fileloc != filepath: + if dag.fileloc != filepath and not is_zipfile: dag.fileloc = filepath try: dag.is_subdag = False diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 9ce114d5eda3e..e85bc5909ae27 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -20,17 +20,21 @@ # flake8: noqa: E402 import inspect from future import standard_library -standard_library.install_aliases() +standard_library.install_aliases() # noqa: E402 from builtins import str, object from cgi import escape from io import BytesIO as IO import functools import gzip +import io import json +import os +import re import time import wtforms from wtforms.compat import text_type +import zipfile from flask import after_this_request, request, Response from flask_admin.model import filters @@ -372,6 +376,22 @@ def zipper(response): return view_func +def open_maybe_zipped(f, mode='r'): + """ + Opens the given file. If the path contains a folder with a .zip suffix, then + the folder is treated as a zip archive, opening the file inside the archive. + + :return: a file object, as in `open`, or as in `ZipFile.open`. + """ + + _, archive, filename = re.search( + r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups() + if archive and zipfile.is_zipfile(archive): + return zipfile.ZipFile(archive, mode=mode).open(filename) + else: + return io.open(f, mode=mode) + + def make_cache_key(*args, **kwargs): """ Used by cache to get a unique key per URL diff --git a/airflow/www/views.py b/airflow/www/views.py index e1a7caa8bb78c..aa2530e45827a 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -661,7 +661,7 @@ def code(self): dag = dagbag.get_dag(dag_id) title = dag_id try: - with open(dag.fileloc, 'r') as f: + with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f: code = f.read() html_code = highlight( code, lexers.PythonLexer(), HtmlFormatter(linenos=True)) diff --git a/airflow/www_rbac/utils.py b/airflow/www_rbac/utils.py index a0e9258eae30e..0176a5312c373 100644 --- a/airflow/www_rbac/utils.py +++ b/airflow/www_rbac/utils.py @@ -26,6 +26,10 @@ import wtforms import bleach import markdown +import re +import zipfile +import os +import io from builtins import str from past.builtins import basestring @@ -202,6 +206,22 @@ def json_response(obj): mimetype="application/json") +def open_maybe_zipped(f, mode='r'): + """ + Opens the given file. If the path contains a folder with a .zip suffix, then + the folder is treated as a zip archive, opening the file inside the archive. + + :return: a file object, as in `open`, or as in `ZipFile.open`. + """ + + _, archive, filename = re.search( + r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups() + if archive and zipfile.is_zipfile(archive): + return zipfile.ZipFile(archive, mode=mode).open(filename) + else: + return io.open(f, mode=mode) + + def make_cache_key(*args, **kwargs): """ Used by cache to get a unique key per URL diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index d011724cc6811..3dc3400968812 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -400,7 +400,7 @@ def code(self): dag = dagbag.get_dag(dag_id) title = dag_id try: - with open(dag.fileloc, 'r') as f: + with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f: code = f.read() html_code = highlight( code, lexers.PythonLexer(), HtmlFormatter(linenos=True)) diff --git a/tests/www/test_utils.py b/tests/www/test_utils.py index 9034b8b5fd28e..a06d6b066a113 100644 --- a/tests/www/test_utils.py +++ b/tests/www/test_utils.py @@ -195,6 +195,40 @@ def some_func(): self.assertEqual(anonymous_username, kwargs['owner']) mocked_session_instance.add.assert_called_once() + def test_open_maybe_zipped_normal_file(self): + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped('/path/to/some/file.txt') + mock_file.assert_called_with('/path/to/some/file.txt', mode='r') + + def test_open_maybe_zipped_normal_file_with_zip_in_name(self): + path = '/path/to/fakearchive.zip.other/file.txt' + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped(path) + mock_file.assert_called_with(path, mode='r') + + @mock.patch("zipfile.is_zipfile") + @mock.patch("zipfile.ZipFile") + def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile): + mocked_is_zipfile.return_value = True + instance = mocked_ZipFile.return_value + instance.open.return_value = mock.mock_open(read_data="data") + + utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt') + + mocked_is_zipfile.assert_called_once() + (args, kwargs) = mocked_is_zipfile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + mocked_ZipFile.assert_called_once() + (args, kwargs) = mocked_ZipFile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + instance.open.assert_called_once() + (args, kwargs) = instance.open.call_args_list[0] + self.assertEqual('deep/path/to/file.txt', args[0]) + def test_get_python_source_from_method(self): class AMockClass(object): def a_method(self): diff --git a/tests/www_rbac/test_utils.py b/tests/www_rbac/test_utils.py index 1879ba082693d..68d1744ab8fa8 100644 --- a/tests/www_rbac/test_utils.py +++ b/tests/www_rbac/test_utils.py @@ -18,6 +18,7 @@ # under the License. import unittest +import mock from xml.dom import minidom from airflow.www_rbac import utils @@ -113,6 +114,40 @@ def test_params_all(self): self.assertEqual('page=3&search=bash_&showPaused=False', utils.get_params(showPaused=False, page=3, search='bash_')) + def test_open_maybe_zipped_normal_file(self): + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped('/path/to/some/file.txt') + mock_file.assert_called_with('/path/to/some/file.txt', mode='r') + + def test_open_maybe_zipped_normal_file_with_zip_in_name(self): + path = '/path/to/fakearchive.zip.other/file.txt' + with mock.patch( + 'io.open', mock.mock_open(read_data="data")) as mock_file: + utils.open_maybe_zipped(path) + mock_file.assert_called_with(path, mode='r') + + @mock.patch("zipfile.is_zipfile") + @mock.patch("zipfile.ZipFile") + def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile): + mocked_is_zipfile.return_value = True + instance = mocked_ZipFile.return_value + instance.open.return_value = mock.mock_open(read_data="data") + + utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt') + + mocked_is_zipfile.assert_called_once() + (args, kwargs) = mocked_is_zipfile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + mocked_ZipFile.assert_called_once() + (args, kwargs) = mocked_ZipFile.call_args_list[0] + self.assertEqual('/path/to/archive.zip', args[0]) + + instance.open.assert_called_once() + (args, kwargs) = instance.open.call_args_list[0] + self.assertEqual('deep/path/to/file.txt', args[0]) + if __name__ == '__main__': unittest.main()