Skip to content

Conversation

@mistercrunch
Copy link
Member

@mistercrunch mistercrunch commented Jun 10, 2016

Task now poke at their own state and self terminate if they are not
running. This allows users to delete task instance without leaving
zombie tasks running on workers.

tested manually by running a sleeping long sleeping task and deleting its state while running, here's the log output for that task:

--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-06-10 10:51:23,558] {models.py:1273} INFO - Executing <Task(BashOperator): runme_0> on 2016-01-01 00:00:00
[2016-06-10 10:51:23,568] {bash_operator.py:55} INFO - tmp dir root location: 
/var/folders/25/sl1p7zbn5vn5vd507y9g_ddw0000gp/T
[2016-06-10 10:51:23,569] {bash_operator.py:64} INFO - Temporary script location :/var/folders/25/sl1p7zbn5vn5vd507y9g_ddw0000gp/T/airflowtmpX34knh//var/folders/25/sl1p7zbn5vn5vd507y9g_ddw0000gp/T/airflowtmpX34knh/runme_0UY7Dju
[2016-06-10 10:51:23,569] {bash_operator.py:65} INFO - Running command: echo "example_bash_operator__runme_0__20160101" && sleep 10000000
[2016-06-10 10:51:23,573] {bash_operator.py:73} INFO - Output:
[2016-06-10 10:51:23,575] {bash_operator.py:77} INFO - example_bash_operator__runme_0__20160101
[2016-06-10 11:03:52,003] {jobs.py:1102} WARNING - State of this instance has been externally set to None. Taking the poison pill. So long.
[2016-06-10 11:03:52,004] {models.py:1282} ERROR - Killing subprocess
[2016-06-10 11:03:52,004] {bash_operator.py:89} INFO - Sending SIGTERM signal to bash subprocess
[2016-06-10 11:03:52,004] {models.py:1340} ERROR - Task received SIGTERM signal
Traceback (most recent call last):
  File "/Users/maxime_beauchemin/code/Airflow/airflow/models.py", line 1299, in run
    result = task_copy.execute(context=context)
  File "/Users/maxime_beauchemin/code/Airflow/airflow/operators/bash_operator.py", line 75, in execute
    for line in iter(sp.stdout.readline, b''):
  File "/Users/maxime_beauchemin/code/Airflow/airflow/models.py", line 1284, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
AirflowException: Task received SIGTERM signal
[2016-06-10 11:03:52,007] {models.py:1360} INFO - Marking task as FAILED.
[2016-06-10 11:03:52,013] {models.py:1381} ERROR - Task received SIGTERM signal

@codecov-io
Copy link

codecov-io commented Jun 10, 2016

Current coverage is 68.12%

Merging #1585 into master will increase coverage by 0.11%

@@             master      #1585   diff @@
==========================================
  Files           116        119     +3   
  Lines          8312       8336    +24   
  Methods           0          0          
  Messages          0          0          
  Branches          0          0          
==========================================
+ Hits           5653       5679    +26   
+ Misses         2659       2657     -2   
  Partials          0          0          

Powered by Codecov. Last updated by 06e70e2...f6fecef

@mistercrunch mistercrunch changed the title [AIRFLOW-234] make task that aren't self-terminate [AIRFLOW-234] make task that aren't running self-terminate Jun 10, 2016
airflow/jobs.py Outdated
Copy link
Contributor

@bolkedebruin bolkedebruin Jun 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the commit? There is no change in state of any of the objects that get saved to the db

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Jun 11, 2016

As promised a test for this:

class LocalTaskJobTest(unittest.TestCase):
    def test_terminate_task(self):
        dag = DAG(
            dag_id='test_terminate_task',
            start_date=DEFAULT_DATE
        )
        dag_task1 = DummyOperator(
            task_id='dummy',
            dag=dag,
            owner='airflow')

        session = settings.Session()
        orm_dag = DagModel(dag_id=dag.dag_id)
        session.merge(orm_dag)
        session.commit()

        scheduler = SchedulerJob()
        dag.clear()

        dr = scheduler.schedule_dag(dag)
        self.assertIsNotNone(dr)

        queue = mock.Mock()
        scheduler.process_dag(dag, queue=queue)

        tis = dr.get_task_instances(session=session)
        for ti in tis:
            self.assertEquals(State.SCHEDULED, ti.state)
            process = mock.Mock()
            job = LocalTaskJob(task_instance=ti)
            job.process = process
            job.start_date = DEFAULT_DATE
            job.heartbeat_callback()
            self.assertEquals(True, job.terminating)
            process.terminate.assert_called()

        session.close()

Please note that if #1581 lands the test needs to be updated as the state will not be set to scheduled by "process_dag" anymore, but that should not be a large change and we could even leave it to check for "None"

@mistercrunch mistercrunch force-pushed the undeads branch 4 times, most recently from 472f9f4 to f6fecef Compare June 15, 2016 21:16
@mistercrunch
Copy link
Member Author

@bolkedebruin I hit some issues with the mocking approach and found a way to run a task asynchronously.

I think this is good to go now.

@bolkedebruin
Copy link
Contributor

👍 , rebase needed sorry!

@mistercrunch mistercrunch force-pushed the undeads branch 2 times, most recently from 7e2c48e to c2f9460 Compare June 17, 2016 15:07
Task now poke at their own state and self terminate if they are not
running. This allows users to delete task instance without leaving
zombie tasks running on workers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants