Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
return found_dags

mods = []
if not zipfile.is_zipfile(filepath):
is_zipfile = zipfile.is_zipfile(filepath)
if not is_zipfile:
if safe_mode and os.path.isfile(filepath):
with open(filepath, 'rb') as f:
content = f.read()
Expand Down Expand Up @@ -409,7 +410,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if isinstance(dag, DAG):
if not dag.full_filepath:
dag.full_filepath = filepath
if dag.fileloc != filepath:
if dag.fileloc != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
Expand Down
22 changes: 21 additions & 1 deletion airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
# flake8: noqa: E402
import inspect
from future import standard_library
standard_library.install_aliases()
standard_library.install_aliases() # noqa: E402
from builtins import str, object

from cgi import escape
from io import BytesIO as IO
import functools
import gzip
import io
import json
import os
import re
import time
import wtforms
from wtforms.compat import text_type
import zipfile

from flask import after_this_request, request, Response
from flask_admin.model import filters
Expand Down Expand Up @@ -372,6 +376,22 @@ def zipper(response):
return view_func


def open_maybe_zipped(f, mode='r'):
"""
Opens the given file. If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive, opening the file inside the archive.

:return: a file object, as in `open`, or as in `ZipFile.open`.
"""

_, archive, filename = re.search(
r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
if archive and zipfile.is_zipfile(archive):
return zipfile.ZipFile(archive, mode=mode).open(filename)
else:
return io.open(f, mode=mode)


def make_cache_key(*args, **kwargs):
"""
Used by cache to get a unique key per URL
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def code(self):
dag = dagbag.get_dag(dag_id)
title = dag_id
try:
with open(dag.fileloc, 'r') as f:
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
html_code = highlight(
code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
Expand Down
20 changes: 20 additions & 0 deletions airflow/www_rbac/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import wtforms
import bleach
import markdown
import re
import zipfile
import os
import io

from builtins import str
from past.builtins import basestring
Expand Down Expand Up @@ -202,6 +206,22 @@ def json_response(obj):
mimetype="application/json")


def open_maybe_zipped(f, mode='r'):
"""
Opens the given file. If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive, opening the file inside the archive.

:return: a file object, as in `open`, or as in `ZipFile.open`.
"""

_, archive, filename = re.search(
r'((.*\.zip){})?(.*)'.format(re.escape(os.sep)), f).groups()
if archive and zipfile.is_zipfile(archive):
return zipfile.ZipFile(archive, mode=mode).open(filename)
else:
return io.open(f, mode=mode)


def make_cache_key(*args, **kwargs):
"""
Used by cache to get a unique key per URL
Expand Down
2 changes: 1 addition & 1 deletion airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ def code(self):
dag = dagbag.get_dag(dag_id)
title = dag_id
try:
with open(dag.fileloc, 'r') as f:
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
html_code = highlight(
code, lexers.PythonLexer(), HtmlFormatter(linenos=True))
Expand Down
34 changes: 34 additions & 0 deletions tests/www/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,40 @@ def some_func():
self.assertEqual(anonymous_username, kwargs['owner'])
mocked_session_instance.add.assert_called_once()

def test_open_maybe_zipped_normal_file(self):
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped('/path/to/some/file.txt')
mock_file.assert_called_with('/path/to/some/file.txt', mode='r')

def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
path = '/path/to/fakearchive.zip.other/file.txt'
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped(path)
mock_file.assert_called_with(path, mode='r')

@mock.patch("zipfile.is_zipfile")
@mock.patch("zipfile.ZipFile")
def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile):
mocked_is_zipfile.return_value = True
instance = mocked_ZipFile.return_value
instance.open.return_value = mock.mock_open(read_data="data")

utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')

mocked_is_zipfile.assert_called_once()
(args, kwargs) = mocked_is_zipfile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

mocked_ZipFile.assert_called_once()
(args, kwargs) = mocked_ZipFile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

instance.open.assert_called_once()
(args, kwargs) = instance.open.call_args_list[0]
self.assertEqual('deep/path/to/file.txt', args[0])

def test_get_python_source_from_method(self):
class AMockClass(object):
def a_method(self):
Expand Down
35 changes: 35 additions & 0 deletions tests/www_rbac/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# under the License.

import unittest
import mock
from xml.dom import minidom

from airflow.www_rbac import utils
Expand Down Expand Up @@ -113,6 +114,40 @@ def test_params_all(self):
self.assertEqual('page=3&search=bash_&showPaused=False',
utils.get_params(showPaused=False, page=3, search='bash_'))

def test_open_maybe_zipped_normal_file(self):
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped('/path/to/some/file.txt')
mock_file.assert_called_with('/path/to/some/file.txt', mode='r')

def test_open_maybe_zipped_normal_file_with_zip_in_name(self):
path = '/path/to/fakearchive.zip.other/file.txt'
with mock.patch(
'io.open', mock.mock_open(read_data="data")) as mock_file:
utils.open_maybe_zipped(path)
mock_file.assert_called_with(path, mode='r')

@mock.patch("zipfile.is_zipfile")
@mock.patch("zipfile.ZipFile")
def test_open_maybe_zipped_archive(self, mocked_ZipFile, mocked_is_zipfile):
mocked_is_zipfile.return_value = True
instance = mocked_ZipFile.return_value
instance.open.return_value = mock.mock_open(read_data="data")

utils.open_maybe_zipped('/path/to/archive.zip/deep/path/to/file.txt')

mocked_is_zipfile.assert_called_once()
(args, kwargs) = mocked_is_zipfile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

mocked_ZipFile.assert_called_once()
(args, kwargs) = mocked_ZipFile.call_args_list[0]
self.assertEqual('/path/to/archive.zip', args[0])

instance.open.assert_called_once()
(args, kwargs) = instance.open.call_args_list[0]
self.assertEqual('deep/path/to/file.txt', args[0])


if __name__ == '__main__':
unittest.main()