From 998c41f326b030d525c7192afcc8fa64e12c84d0 Mon Sep 17 00:00:00 2001 From: Benjamin Sims Date: Wed, 5 Jul 2017 13:33:05 +0100 Subject: [PATCH 1/9] Change the docker import so that it works with the latest version of the library --- airflow/operators/docker_operator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index bca604cba9710..e67ccd31fb13f 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -18,7 +18,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory -from docker import Client, tls +from docker import APIClient as Client, tls import ast From 447649324925f9cce4c23027d534b3895a980557 Mon Sep 17 00:00:00 2001 From: Benjamin Sims Date: Wed, 5 Jul 2017 13:39:14 +0100 Subject: [PATCH 2/9] Add in parameters for setting the host temp directory --- airflow/operators/docker_operator.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index bca604cba9710..33c19b4a63019 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -46,6 +46,9 @@ class DockerOperator(BaseOperator): :type environment: dict :param force_pull: Pull the docker image on every run. :type force_pull: bool + :param host_tmp_dir: Specify the location of the temporary directory on the host which will + be mapped to tmp_dir + :type host_tmp_dir: str :param mem_limit: Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, or a string like ``128m`` or ``1g``. :type mem_limit: float or str @@ -89,6 +92,7 @@ def __init__( docker_url='unix://var/run/docker.sock', environment=None, force_pull=False, + host_tmp_dir=None, mem_limit=None, network_mode=None, tls_ca_cert=None, @@ -111,6 +115,7 @@ def __init__( self.docker_url = docker_url self.environment = environment or {} self.force_pull = force_pull + self.host_tmp_dir = host_tmp_dir self.image = image self.mem_limit = mem_limit self.network_mode = network_mode @@ -157,7 +162,7 @@ def execute(self, context): cpu_shares = int(round(self.cpus * 1024)) - with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir: + with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir: self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir)) From 0581d1d7f0acfdcb0c9b55a1d4496b3b567f243a Mon Sep 17 00:00:00 2001 From: Benjamin Sims Date: Wed, 5 Jul 2017 13:51:55 +0100 Subject: [PATCH 3/9] Add the option to set the working directory on the container --- airflow/operators/docker_operator.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index 7aa96b42802c2..1a5c40813c0ba 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -73,6 +73,9 @@ class DockerOperator(BaseOperator): :type user: int or str :param volumes: List of volumes to mount into the container, e.g. ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``. + :param working_dir: Working directory to set on the container (equivalent to the -w switch + the docker client) + :type working_dir: str :param xcom_push: Does the stdout will be pushed to the next step using XCom. The default is False. :type xcom_push: bool @@ -103,6 +106,7 @@ def __init__( tmp_dir='/tmp/airflow', user=None, volumes=None, + working_dir=None, xcom_push=False, xcom_all=False, *args, @@ -127,6 +131,7 @@ def __init__( self.tmp_dir = tmp_dir self.user = user self.volumes = volumes or [] + self.working_dir = working_dir self.xcom_push_flag = xcom_push self.xcom_all = xcom_all @@ -174,7 +179,8 @@ def execute(self, context): network_mode=self.network_mode), image=image, mem_limit=self.mem_limit, - user=self.user + user=self.user, + working_dir=self.working_dir ) self.cli.start(self.container['Id']) From f836e9521569cc65fd1562d0d47b141ca78615ba Mon Sep 17 00:00:00 2001 From: Benjamin Sims Date: Wed, 5 Jul 2017 13:33:05 +0100 Subject: [PATCH 4/9] [AIRFLOW-1380] Update docker import to latest lib Changes import of Docker client library to make it compatible with the latest version. --- airflow/operators/docker_operator.py | 2 +- tests/operators/docker_operator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index bca604cba9710..e67ccd31fb13f 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -18,7 +18,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory -from docker import Client, tls +from docker import APIClient as Client, tls import ast diff --git a/tests/operators/docker_operator.py b/tests/operators/docker_operator.py index 2554aaa76d8a0..efec3be1ea826 100644 --- a/tests/operators/docker_operator.py +++ b/tests/operators/docker_operator.py @@ -17,7 +17,7 @@ try: from airflow.operators.docker_operator import DockerOperator - from docker.client import Client + from docker.client import APIClient as Client except ImportError: pass From 4202b9a09f761c3e14af965b5e01e9622056eef8 Mon Sep 17 00:00:00 2001 From: Benjamin Sims Date: Wed, 5 Jul 2017 13:39:14 +0100 Subject: [PATCH 5/9] [AIRFLOW-1381] Specify host temporary directory Allow user to specify temporary directory to use on the host machine; default settings will cause an error on OS X due to the standard temporary directory not being shared to Docker. --- airflow/operators/docker_operator.py | 7 ++++++- tests/operators/docker_operator.py | 3 +++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index bca604cba9710..33c19b4a63019 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -46,6 +46,9 @@ class DockerOperator(BaseOperator): :type environment: dict :param force_pull: Pull the docker image on every run. :type force_pull: bool + :param host_tmp_dir: Specify the location of the temporary directory on the host which will + be mapped to tmp_dir + :type host_tmp_dir: str :param mem_limit: Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, or a string like ``128m`` or ``1g``. :type mem_limit: float or str @@ -89,6 +92,7 @@ def __init__( docker_url='unix://var/run/docker.sock', environment=None, force_pull=False, + host_tmp_dir=None, mem_limit=None, network_mode=None, tls_ca_cert=None, @@ -111,6 +115,7 @@ def __init__( self.docker_url = docker_url self.environment = environment or {} self.force_pull = force_pull + self.host_tmp_dir = host_tmp_dir self.image = image self.mem_limit = mem_limit self.network_mode = network_mode @@ -157,7 +162,7 @@ def execute(self, context): cpu_shares = int(round(self.cpus * 1024)) - with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir: + with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir: self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir)) diff --git a/tests/operators/docker_operator.py b/tests/operators/docker_operator.py index 2554aaa76d8a0..8ae38c5502393 100644 --- a/tests/operators/docker_operator.py +++ b/tests/operators/docker_operator.py @@ -51,6 +51,7 @@ def test_execute(self, client_class_mock, mkdtemp_mock): client_class_mock.return_value = client_mock operator = DockerOperator(api_version='1.19', command='env', environment={'UNIT': 'TEST'}, + host_tmp_dir='/host/airflow', image='ubuntu:latest', network_mode='bridge', owner='unittest', task_id='unittest', volumes=['/host/path:/container/path']) operator.execute(None) @@ -66,6 +67,8 @@ def test_execute(self, client_class_mock, mkdtemp_mock): host_config=host_config, image='ubuntu:latest', mem_limit=None, user=None) + mkdtemp_mock.assert_called_with(dir='/host/airflow', + prefix='airflowtmp', suffix='') client_mock.create_host_config.assert_called_with(binds=['/host/path:/container/path', '/mkdtemp:/tmp/airflow'], network_mode='bridge') From eb158e121a7729894fc8823e50e3f413d91141b6 Mon Sep 17 00:00:00 2001 From: Benjamin Sims Date: Wed, 5 Jul 2017 13:51:55 +0100 Subject: [PATCH 6/9] [AIRFLOW-1382] Add working dir option Allow the user to specify the working directory to be used in the created container. Equivalent to docker run/create -w. --- airflow/operators/docker_operator.py | 8 +++++++- tests/operators/docker_operator.py | 7 +++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index bca604cba9710..ddcc97b8d5432 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -70,6 +70,9 @@ class DockerOperator(BaseOperator): :type user: int or str :param volumes: List of volumes to mount into the container, e.g. ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``. + :param working_dir: Working directory to set on the container (equivalent to the -w switch + the docker client) + :type working_dir: str :param xcom_push: Does the stdout will be pushed to the next step using XCom. The default is False. :type xcom_push: bool @@ -99,6 +102,7 @@ def __init__( tmp_dir='/tmp/airflow', user=None, volumes=None, + working_dir=None, xcom_push=False, xcom_all=False, *args, @@ -122,6 +126,7 @@ def __init__( self.tmp_dir = tmp_dir self.user = user self.volumes = volumes or [] + self.working_dir = working_dir self.xcom_push_flag = xcom_push self.xcom_all = xcom_all @@ -169,7 +174,8 @@ def execute(self, context): network_mode=self.network_mode), image=image, mem_limit=self.mem_limit, - user=self.user + user=self.user, + working_dir=self.working_dir ) self.cli.start(self.container['Id']) diff --git a/tests/operators/docker_operator.py b/tests/operators/docker_operator.py index 2554aaa76d8a0..264096cc1d316 100644 --- a/tests/operators/docker_operator.py +++ b/tests/operators/docker_operator.py @@ -52,7 +52,8 @@ def test_execute(self, client_class_mock, mkdtemp_mock): operator = DockerOperator(api_version='1.19', command='env', environment={'UNIT': 'TEST'}, image='ubuntu:latest', network_mode='bridge', owner='unittest', - task_id='unittest', volumes=['/host/path:/container/path']) + task_id='unittest', volumes=['/host/path:/container/path'], + working_dir='/container/path') operator.execute(None) client_class_mock.assert_called_with(base_url='unix://var/run/docker.sock', tls=None, @@ -65,7 +66,9 @@ def test_execute(self, client_class_mock, mkdtemp_mock): }, host_config=host_config, image='ubuntu:latest', - mem_limit=None, user=None) + mem_limit=None, user=None, + working_dir='/container/path' + ) client_mock.create_host_config.assert_called_with(binds=['/host/path:/container/path', '/mkdtemp:/tmp/airflow'], network_mode='bridge') From f9b26876626972f2e356c244fe7ddb5f98aaeab4 Mon Sep 17 00:00:00 2001 From: maikeldotuk Date: Mon, 2 Oct 2017 18:28:26 +0100 Subject: [PATCH 7/9] Added requirements file --- requirements.txt | 64 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000..664c15f1eaef6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,64 @@ +alembic==0.8.10 +-e git+https://github.com/benjaminsims/incubator-airflow@cf4624f847fb5f57c27c02aefea6040099f9631c#egg=apache_airflow +asn1crypto==0.23.0 +bleach==2.0.0 +certifi==2017.7.27.1 +cffi==1.11.0 +chardet==3.0.4 +click==6.7 +configparser==3.5.0 +croniter==0.3.19 +cryptography==2.0.3 +dill==0.2.7.1 +docker==2.5.1 +docker-pycreds==0.2.1 +docutils==0.14 +Flask==0.11.1 +Flask-Admin==1.4.1 +Flask-Cache==0.13.1 +Flask-Login==0.2.11 +flask-swagger==0.2.13 +Flask-WTF==0.14 +funcsigs==1.0.0 +future==0.16.0 +gitdb2==2.0.3 +GitPython==2.1.7 +gunicorn==19.3.0 +html5lib==0.999999999 +idna==2.6 +itsdangerous==0.24 +Jinja2==2.8.1 +jsonschema==2.6.0 +lockfile==0.12.2 +lxml==3.8.0 +Mako==1.0.7 +Markdown==2.6.9 +MarkupSafe==1.0 +numpy==1.13.3 +ordereddict==1.1 +pandas==0.20.3 +psutil==4.4.2 +pycparser==2.18 +Pygments==2.2.0 +python-daemon==2.1.2 +python-dateutil==2.6.1 +python-editor==1.0.3 +python-nvd3==0.14.2 +python-slugify==1.1.4 +pytz==2017.2 +PyYAML==3.12 +requests==2.18.4 +setproctitle==1.1.10 +six==1.11.0 +smmap2==2.0.3 +SQLAlchemy==1.1.14 +tabulate==0.7.7 +thrift==0.9.3 +tqdm==4.18.0 +Unidecode==0.4.21 +urllib3==1.22 +webencodings==0.5.1 +websocket-client==0.44.0 +Werkzeug==0.12.2 +WTForms==2.1 +zope.deprecation==4.3.0 From 9d8dbe781d29310604a7dc25a752ac5886ef112e Mon Sep 17 00:00:00 2001 From: maikeldotuk Date: Mon, 2 Oct 2017 18:30:12 +0100 Subject: [PATCH 8/9] Modified requirements to only include what is not in setup.py --- requirements.txt | 60 ------------------------------------------------ 1 file changed, 60 deletions(-) diff --git a/requirements.txt b/requirements.txt index 664c15f1eaef6..e0f58b87c59e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,64 +1,4 @@ -alembic==0.8.10 --e git+https://github.com/benjaminsims/incubator-airflow@cf4624f847fb5f57c27c02aefea6040099f9631c#egg=apache_airflow -asn1crypto==0.23.0 -bleach==2.0.0 -certifi==2017.7.27.1 -cffi==1.11.0 -chardet==3.0.4 -click==6.7 -configparser==3.5.0 -croniter==0.3.19 cryptography==2.0.3 -dill==0.2.7.1 docker==2.5.1 -docker-pycreds==0.2.1 -docutils==0.14 -Flask==0.11.1 -Flask-Admin==1.4.1 -Flask-Cache==0.13.1 -Flask-Login==0.2.11 -flask-swagger==0.2.13 -Flask-WTF==0.14 -funcsigs==1.0.0 -future==0.16.0 -gitdb2==2.0.3 -GitPython==2.1.7 -gunicorn==19.3.0 -html5lib==0.999999999 -idna==2.6 -itsdangerous==0.24 -Jinja2==2.8.1 jsonschema==2.6.0 -lockfile==0.12.2 -lxml==3.8.0 -Mako==1.0.7 -Markdown==2.6.9 -MarkupSafe==1.0 -numpy==1.13.3 -ordereddict==1.1 -pandas==0.20.3 -psutil==4.4.2 -pycparser==2.18 -Pygments==2.2.0 -python-daemon==2.1.2 -python-dateutil==2.6.1 -python-editor==1.0.3 -python-nvd3==0.14.2 -python-slugify==1.1.4 -pytz==2017.2 -PyYAML==3.12 -requests==2.18.4 -setproctitle==1.1.10 -six==1.11.0 -smmap2==2.0.3 -SQLAlchemy==1.1.14 -tabulate==0.7.7 -thrift==0.9.3 tqdm==4.18.0 -Unidecode==0.4.21 -urllib3==1.22 -webencodings==0.5.1 -websocket-client==0.44.0 -Werkzeug==0.12.2 -WTForms==2.1 -zope.deprecation==4.3.0 From 9dfd6bf002299ebf0252ae25697ae841a1f1cdf9 Mon Sep 17 00:00:00 2001 From: Benjamin Sims Date: Mon, 18 Dec 2017 15:00:56 +0000 Subject: [PATCH 9/9] Make the hook work with recent Docker verions --- airflow/hooks/docker_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/hooks/docker_hook.py b/airflow/hooks/docker_hook.py index a570292ead2b9..3512641c49f2e 100644 --- a/airflow/hooks/docker_hook.py +++ b/airflow/hooks/docker_hook.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from docker import Client +from docker import APIClient as Client from docker.errors import APIError from airflow.exceptions import AirflowException