diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py index ccf84c16cd1e9..f0789b124225e 100644 --- a/airflow/example_dags/docker_copy_data.py +++ b/airflow/example_dags/docker_copy_data.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. ''' This sample "listen to directory". move the new file and print it, using docker-containers. The following operators are being used: DockerOperator, BashOperator & ShortCircuitOperator. diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 4ab9144fa45e9..c759f4d9ecddf 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from builtins import range from airflow.operators import BashOperator, DummyOperator from airflow.models import DAG diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index f576d20866450..edd177a15fe4c 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from airflow.operators import BranchPythonOperator, DummyOperator from airflow.models import DAG from datetime import datetime, timedelta diff --git a/airflow/example_dags/example_docker_operator.py b/airflow/example_dags/example_docker_operator.py index e014fe5d5ba08..6bb71ff2a7da5 100644 --- a/airflow/example_dags/example_docker_operator.py +++ b/airflow/example_dags/example_docker_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """ from airflow import DAG from airflow.operators import BashOperator diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index 45018252e9e52..41ea38501f594 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """ ### Example HTTP operator and sensor """ diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 0fc2180d630b3..a2f8abd5a6961 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from __future__ import print_function from builtins import range from airflow.operators import PythonOperator diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 967c65edc3f5c..907cf51fa8b44 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from airflow.operators import ShortCircuitOperator, DummyOperator from airflow.models import DAG import airflow.utils.helpers diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index 120f33307097d..57a62c664919a 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from datetime import datetime from airflow.models import DAG diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 4b66ad1513d72..b754d643584c6 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -1,4 +1,16 @@ - +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """This example illustrates the use of the TriggerDagRunOperator. There are 2 entities at work in this scenario: 1. The Controller DAG - the DAG that conditionally executes the trigger diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 172003f05fc2a..41a3e36c82ed1 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from airflow.operators import BashOperator, PythonOperator from airflow.models import DAG from datetime import datetime diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 2d9c087c7a37c..71cd44ec60b7c 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from __future__ import print_function import airflow from datetime import datetime, timedelta diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py new file mode 100644 index 0000000000000..38e50d0e5b620 --- /dev/null +++ b/airflow/example_dags/test_utils.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Used for unit tests""" +from airflow.operators import BashOperator +from airflow.models import DAG +from datetime import datetime + +dag = DAG( + dag_id='test_utils', + schedule_interval=None, +) + +task = BashOperator( + task_id='sleeps_forever', + dag=dag, + bash_command="sleep 10000000000", + start_date=datetime(2016, 1, 1), + owner='airflow') diff --git a/airflow/jobs.py b/airflow/jobs.py index 1e583ac41be77..0713bbe8d0153 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -19,7 +19,7 @@ from past.builtins import basestring from collections import defaultdict, Counter -from datetime import datetime +from datetime import datetime, timedelta import getpass import logging import socket @@ -116,7 +116,7 @@ def on_kill(self): ''' pass - def heartbeat_callback(self): + def heartbeat_callback(self, session=None): pass def heartbeat(self): @@ -139,7 +139,7 @@ def heartbeat(self): sleep at all. ''' session = settings.Session() - job = session.query(BaseJob).filter(BaseJob.id == self.id).first() + job = session.query(BaseJob).filter_by(id=self.id).one() if job.state == State.SHUTDOWN: self.kill() @@ -154,9 +154,9 @@ def heartbeat(self): session.merge(job) session.commit() - session.close() - self.heartbeat_callback() + self.heartbeat_callback(session=session) + session.close() self.logger.debug('[heart] Boom.') def run(self): @@ -378,7 +378,8 @@ def import_errors(self, dagbag): filename=filename, stacktrace=stacktrace)) session.commit() - def schedule_dag(self, dag): + @provide_session + def schedule_dag(self, dag, session=None): """ This method checks whether a new DagRun needs to be created for a DAG based on scheduling interval @@ -386,7 +387,6 @@ def schedule_dag(self, dag): """ if dag.schedule_interval: DagRun = models.DagRun - session = settings.Session() active_runs = DagRun.find( dag_id=dag.dag_id, state=State.RUNNING, @@ -799,10 +799,10 @@ def _execute(self): finally: settings.Session.remove() executor.end() - session.close() - def heartbeat_callback(self): + @provide_session + def heartbeat_callback(self, session=None): Stats.gauge('scheduler_heartbeat', 1, 1) @@ -1093,6 +1093,15 @@ def __init__( self.pool = pool self.pickle_id = pickle_id self.mark_success = mark_success + + # terminating state is used so that a job don't try to + # terminate multiple times + self.terminating = False + + # Keeps track of the fact that the task instance has been observed + # as running at least once + self.was_running = False + super(LocalTaskJob, self).__init__(*args, **kwargs) def _execute(self): @@ -1115,23 +1124,26 @@ def _execute(self): def on_kill(self): self.process.terminate() - """ - def heartbeat_callback(self): - if datetime.now() - self.start_date < timedelta(seconds=300): + @provide_session + def heartbeat_callback(self, session=None): + """Self destruct task if state has been moved away from running externally""" + + if self.terminating: + # task is already terminating, let it breathe return + # Suicide pill TI = models.TaskInstance ti = self.task_instance - session = settings.Session() state = session.query(TI.state).filter( TI.dag_id==ti.dag_id, TI.task_id==ti.task_id, TI.execution_date==ti.execution_date).scalar() - session.commit() - session.close() - if state != State.RUNNING: + if state == State.RUNNING: + self.was_running = True + elif self.was_running and hasattr(self, 'process'): logging.warning( "State of this instance has been externally set to " "{self.task_instance.state}. " "Taking the poison pill. So long.".format(**locals())) self.process.terminate() - """ + self.terminating = True diff --git a/airflow/models.py b/airflow/models.py index 38359f7644dee..09d880ed4e903 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1179,17 +1179,32 @@ def pool_full(self, session): def run( self, verbose=True, - ignore_dependencies=False, # Doesn't check for deps, just runs - ignore_depends_on_past=False, # Ignore depends_on_past but respect - # other deps - force=False, # Disregards previous successes - mark_success=False, # Don't run the task, act as if it succeeded - test_mode=False, # Doesn't record success or failure in the DB + ignore_dependencies=False, + ignore_depends_on_past=False, + force=False, + mark_success=False, + test_mode=False, job_id=None, pool=None, session=None): """ Runs the task instance. + + :param verbose: whether to turn on more verbose loggin + :type verbose: boolean + :param ignore_dependencies: Doesn't check for deps, just runs + :type ignore_dependencies: boolean + :param ignore_depends_on_past: Ignore depends_on_past but respect + other dependencies + :type ignore_depends_on_past: boolean + :param force: Forces a run regarless of previous success + :type force: boolean + :param mark_success: Don't run the task, mark its state as success + :type mark_success: boolean + :param test_mode: Doesn't record success or failure in the DB + :type test_mode: boolean + :param pool: specifies the pool to use to run the task instance + :type pool: str """ task = self.task self.pool = pool or task.pool diff --git a/tests/core.py b/tests/core.py index 5e6a4fd65ee64..2ab14ea4bfd19 100644 --- a/tests/core.py +++ b/tests/core.py @@ -19,6 +19,7 @@ import os import re import unittest +import multiprocessing import mock import tempfile from datetime import datetime, time, timedelta @@ -49,7 +50,7 @@ import six -NUM_EXAMPLE_DAGS = 15 +NUM_EXAMPLE_DAGS = 16 DEV_NULL = '/dev/null' TEST_DAG_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags') @@ -629,6 +630,45 @@ def test_bad_trigger_rule(self): trigger_rule="non_existant", dag=self.dag) + def test_terminate_task(self): + """If a task instance's db state get deleted, it should fail""" + TI = models.TaskInstance + dag = self.dagbag.dags.get('test_utils') + task = dag.task_dict.get('sleeps_forever') + + ti = TI(task=task, execution_date=DEFAULT_DATE) + job = jobs.LocalTaskJob( + task_instance=ti, force=True, executor=SequentialExecutor()) + + # Running task instance asynchronously + p = multiprocessing.Process(target=job.run) + p.start() + sleep(5) + settings.engine.dispose() + session = settings.Session() + ti.refresh_from_db(session=session) + # making sure it's actually running + assert State.RUNNING == ti.state + ti = ( + session.query(TI) + .filter_by( + dag_id=task.dag_id, + task_id=task.task_id, + execution_date=DEFAULT_DATE) + .one() + ) + # deleting the instance should result in a failure + session.delete(ti) + session.commit() + # waiting for the async task to finish + p.join() + + # making sure that the task ended up as failed + ti.refresh_from_db(session=session) + assert State.FAILED == ti.state + session.close() + + class CliTests(unittest.TestCase): def setUp(self): configuration.test_mode()