diff --git a/airflow/hooks/docker_hook.py b/airflow/hooks/docker_hook.py index a570292ead2b9..3512641c49f2e 100644 --- a/airflow/hooks/docker_hook.py +++ b/airflow/hooks/docker_hook.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from docker import Client +from docker import APIClient as Client from docker.errors import APIError from airflow.exceptions import AirflowException diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index 38edc8b4d79ab..fd540f1b8a125 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -19,7 +19,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory -from docker import Client, tls +from docker import APIClient as Client, tls import ast @@ -53,6 +53,9 @@ class DockerOperator(BaseOperator): :type environment: dict :param force_pull: Pull the docker image on every run. Default is false. :type force_pull: bool + :param host_tmp_dir: Specify the location of the temporary directory on the host which will + be mapped to tmp_dir + :type host_tmp_dir: str :param mem_limit: Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, or a string like ``128m`` or ``1g``. :type mem_limit: float or str @@ -101,6 +104,7 @@ def __init__( docker_url='unix://var/run/docker.sock', environment=None, force_pull=False, + host_tmp_dir=None, mem_limit=None, network_mode=None, tls_ca_cert=None, @@ -125,6 +129,7 @@ def __init__( self.docker_url = docker_url self.environment = environment or {} self.force_pull = force_pull + self.host_tmp_dir = host_tmp_dir self.image = image self.mem_limit = mem_limit self.network_mode = network_mode @@ -179,7 +184,7 @@ def execute(self, context): cpu_shares = int(round(self.cpus * 1024)) - with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir: + with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir: self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir)) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000..e0f58b87c59e5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +cryptography==2.0.3 +docker==2.5.1 +jsonschema==2.6.0 +tqdm==4.18.0 diff --git a/tests/operators/docker_operator.py b/tests/operators/docker_operator.py index a12b6f829f8a3..4b83c1f22e100 100644 --- a/tests/operators/docker_operator.py +++ b/tests/operators/docker_operator.py @@ -17,8 +17,8 @@ try: from airflow.operators.docker_operator import DockerOperator + from docker.client import APIClient as Client from airflow.hooks.docker_hook import DockerHook - from docker import Client except ImportError: pass @@ -51,6 +51,7 @@ def test_execute(self, client_class_mock, mkdtemp_mock): client_class_mock.return_value = client_mock operator = DockerOperator(api_version='1.19', command='env', environment={'UNIT': 'TEST'}, + host_tmp_dir='/host/airflow', image='ubuntu:latest', network_mode='bridge', owner='unittest', task_id='unittest', volumes=['/host/path:/container/path'], working_dir='/container/path') @@ -69,6 +70,8 @@ def test_execute(self, client_class_mock, mkdtemp_mock): mem_limit=None, user=None, working_dir='/container/path' ) + mkdtemp_mock.assert_called_with(dir='/host/airflow', + prefix='airflowtmp', suffix='') client_mock.create_host_config.assert_called_with(binds=['/host/path:/container/path', '/mkdtemp:/tmp/airflow'], network_mode='bridge')