Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/hooks/docker_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from docker import Client
from docker import APIClient as Client
from docker.errors import APIError

from airflow.exceptions import AirflowException
Expand Down
9 changes: 7 additions & 2 deletions airflow/operators/docker_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.file import TemporaryDirectory
from docker import Client, tls
from docker import APIClient as Client, tls
import ast


Expand Down Expand Up @@ -53,6 +53,9 @@ class DockerOperator(BaseOperator):
:type environment: dict
:param force_pull: Pull the docker image on every run. Default is false.
:type force_pull: bool
:param host_tmp_dir: Specify the location of the temporary directory on the host which will
be mapped to tmp_dir
:type host_tmp_dir: str
:param mem_limit: Maximum amount of memory the container can use. Either a float value, which
represents the limit in bytes, or a string like ``128m`` or ``1g``.
:type mem_limit: float or str
Expand Down Expand Up @@ -101,6 +104,7 @@ def __init__(
docker_url='unix://var/run/docker.sock',
environment=None,
force_pull=False,
host_tmp_dir=None,
mem_limit=None,
network_mode=None,
tls_ca_cert=None,
Expand All @@ -125,6 +129,7 @@ def __init__(
self.docker_url = docker_url
self.environment = environment or {}
self.force_pull = force_pull
self.host_tmp_dir = host_tmp_dir
self.image = image
self.mem_limit = mem_limit
self.network_mode = network_mode
Expand Down Expand Up @@ -179,7 +184,7 @@ def execute(self, context):

cpu_shares = int(round(self.cpus * 1024))

with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir:
with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir:
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir))

Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cryptography==2.0.3
docker==2.5.1
jsonschema==2.6.0
tqdm==4.18.0
5 changes: 4 additions & 1 deletion tests/operators/docker_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

try:
from airflow.operators.docker_operator import DockerOperator
from docker.client import APIClient as Client
from airflow.hooks.docker_hook import DockerHook
from docker import Client
except ImportError:
pass

Expand Down Expand Up @@ -51,6 +51,7 @@ def test_execute(self, client_class_mock, mkdtemp_mock):
client_class_mock.return_value = client_mock

operator = DockerOperator(api_version='1.19', command='env', environment={'UNIT': 'TEST'},
host_tmp_dir='/host/airflow',
image='ubuntu:latest', network_mode='bridge', owner='unittest',
task_id='unittest', volumes=['/host/path:/container/path'],
working_dir='/container/path')
Expand All @@ -69,6 +70,8 @@ def test_execute(self, client_class_mock, mkdtemp_mock):
mem_limit=None, user=None,
working_dir='/container/path'
)
mkdtemp_mock.assert_called_with(dir='/host/airflow',
prefix='airflowtmp', suffix='')
client_mock.create_host_config.assert_called_with(binds=['/host/path:/container/path',
'/mkdtemp:/tmp/airflow'],
network_mode='bridge')
Expand Down