Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airflow/example_dags/docker_copy_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
This sample "listen to directory". move the new file and print it, using docker-containers.
The following operators are being used: DockerOperator, BashOperator & ShortCircuitOperator.
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_bash_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from builtins import range
from airflow.operators import BashOperator, DummyOperator
from airflow.models import DAG
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_branch_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import BranchPythonOperator, DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_docker_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
from airflow import DAG
from airflow.operators import BashOperator
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_http_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
### Example HTTP operator and sensor
"""
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import ShortCircuitOperator, DummyOperator
from airflow.models import DAG
import airflow.utils.helpers
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime

from airflow.models import DAG
Expand Down
14 changes: 13 additions & 1 deletion airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@

# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import BashOperator, PythonOperator
from airflow.models import DAG
from datetime import datetime
Expand Down
13 changes: 13 additions & 0 deletions airflow/example_dags/example_xcom.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import airflow
from datetime import datetime, timedelta
Expand Down
29 changes: 29 additions & 0 deletions airflow/example_dags/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Used for unit tests"""
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime

dag = DAG(
dag_id='test_utils',
schedule_interval=None,
)

task = BashOperator(
task_id='sleeps_forever',
dag=dag,
bash_command="sleep 10000000000",
start_date=datetime(2016, 1, 1),
owner='airflow')
46 changes: 29 additions & 17 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from past.builtins import basestring
from collections import defaultdict, Counter
from datetime import datetime
from datetime import datetime, timedelta
import getpass
import logging
import socket
Expand Down Expand Up @@ -116,7 +116,7 @@ def on_kill(self):
'''
pass

def heartbeat_callback(self):
def heartbeat_callback(self, session=None):
pass

def heartbeat(self):
Expand All @@ -139,7 +139,7 @@ def heartbeat(self):
sleep at all.
'''
session = settings.Session()
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
job = session.query(BaseJob).filter_by(id=self.id).one()

if job.state == State.SHUTDOWN:
self.kill()
Expand All @@ -154,9 +154,9 @@ def heartbeat(self):

session.merge(job)
session.commit()
session.close()

self.heartbeat_callback()
self.heartbeat_callback(session=session)
session.close()
self.logger.debug('[heart] Boom.')

def run(self):
Expand Down Expand Up @@ -378,15 +378,15 @@ def import_errors(self, dagbag):
filename=filename, stacktrace=stacktrace))
session.commit()

def schedule_dag(self, dag):
@provide_session
def schedule_dag(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
Returns DagRun if one is scheduled. Otherwise returns None.
"""
if dag.schedule_interval:
DagRun = models.DagRun
session = settings.Session()
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
Expand Down Expand Up @@ -799,10 +799,10 @@ def _execute(self):
finally:
settings.Session.remove()
executor.end()

session.close()

def heartbeat_callback(self):
@provide_session
def heartbeat_callback(self, session=None):
Stats.gauge('scheduler_heartbeat', 1, 1)


Expand Down Expand Up @@ -1093,6 +1093,15 @@ def __init__(
self.pool = pool
self.pickle_id = pickle_id
self.mark_success = mark_success

# terminating state is used so that a job don't try to
# terminate multiple times
self.terminating = False

# Keeps track of the fact that the task instance has been observed
# as running at least once
self.was_running = False

super(LocalTaskJob, self).__init__(*args, **kwargs)

def _execute(self):
Expand All @@ -1115,23 +1124,26 @@ def _execute(self):
def on_kill(self):
self.process.terminate()

"""
def heartbeat_callback(self):
if datetime.now() - self.start_date < timedelta(seconds=300):
@provide_session
def heartbeat_callback(self, session=None):
"""Self destruct task if state has been moved away from running externally"""

if self.terminating:
# task is already terminating, let it breathe
return

# Suicide pill
TI = models.TaskInstance
ti = self.task_instance
session = settings.Session()
state = session.query(TI.state).filter(
TI.dag_id==ti.dag_id, TI.task_id==ti.task_id,
TI.execution_date==ti.execution_date).scalar()
session.commit()
session.close()
if state != State.RUNNING:
if state == State.RUNNING:
self.was_running = True
elif self.was_running and hasattr(self, 'process'):
logging.warning(
"State of this instance has been externally set to "
"{self.task_instance.state}. "
"Taking the poison pill. So long.".format(**locals()))
self.process.terminate()
"""
self.terminating = True
27 changes: 21 additions & 6 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,17 +1179,32 @@ def pool_full(self, session):
def run(
self,
verbose=True,
ignore_dependencies=False, # Doesn't check for deps, just runs
ignore_depends_on_past=False, # Ignore depends_on_past but respect
# other deps
force=False, # Disregards previous successes
mark_success=False, # Don't run the task, act as if it succeeded
test_mode=False, # Doesn't record success or failure in the DB
ignore_dependencies=False,
ignore_depends_on_past=False,
force=False,
mark_success=False,
test_mode=False,
job_id=None,
pool=None,
session=None):
"""
Runs the task instance.

:param verbose: whether to turn on more verbose loggin
:type verbose: boolean
:param ignore_dependencies: Doesn't check for deps, just runs
:type ignore_dependencies: boolean
:param ignore_depends_on_past: Ignore depends_on_past but respect
other dependencies
:type ignore_depends_on_past: boolean
:param force: Forces a run regarless of previous success
:type force: boolean
:param mark_success: Don't run the task, mark its state as success
:type mark_success: boolean
:param test_mode: Doesn't record success or failure in the DB
:type test_mode: boolean
:param pool: specifies the pool to use to run the task instance
:type pool: str
"""
task = self.task
self.pool = pool or task.pool
Expand Down
Loading