From 0681548a8849ec607e61c34c0a432664d51e48c6 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 31 Dec 2019 09:44:55 +0530 Subject: [PATCH 01/18] adding pool capacity required for each task for dynamic pooling --- airflow/models/baseoperator.py | 2 ++ airflow/models/pool.py | 12 ++++++------ airflow/models/taskinstance.py | 8 ++++++++ airflow/ti_deps/deps/pool_slots_available_dep.py | 4 ++-- tests/ti_deps/deps/test_pool_slots_available_dep.py | 8 ++++---- tests/www_rbac/_trial_temp/_trial_marker | 0 tests/www_rbac/_trial_temp/test.log | 1 + 7 files changed, 23 insertions(+), 12 deletions(-) create mode 100644 tests/www_rbac/_trial_temp/_trial_marker create mode 100644 tests/www_rbac/_trial_temp/test.log diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index e6cbcd78ea298..7ad823aa861f0 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -313,6 +313,7 @@ def __init__( weight_rule: str = WeightRule.DOWNSTREAM, queue: str = conf.get('celery', 'default_queue'), pool: str = Pool.DEFAULT_POOL_NAME, + pool_capacity: int = 1, sla: Optional[timedelta] = None, execution_timeout: Optional[timedelta] = None, on_execute_callback: Optional[Callable] = None, @@ -381,6 +382,7 @@ def __init__( self.retries = retries self.queue = queue self.pool = pool + self.pool_capacity = pool_capacity self.sla = sla self.execution_timeout = execution_timeout self.on_execute_callback = on_execute_callback diff --git a/airflow/models/pool.py b/airflow/models/pool.py index 6ea4d4aa4d486..1b598f3af9f87 100644 --- a/airflow/models/pool.py +++ b/airflow/models/pool.py @@ -65,11 +65,11 @@ def occupied_slots(self, session): from airflow.models.taskinstance import TaskInstance # Avoid circular import return ( session - .query(func.count()) + .query(func.sum(TaskInstance.pool_capacity)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state.in_(STATES_TO_COUNT_AS_RUNNING)) .scalar() - ) + ) or 0 @provide_session def used_slots(self, session): @@ -80,11 +80,11 @@ def used_slots(self, session): running = ( session - .query(func.count()) + .query(func.sum(TaskInstance.pool_capacity)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state == State.RUNNING) .scalar() - ) + ) or 0 return running @provide_session @@ -96,11 +96,11 @@ def queued_slots(self, session): return ( session - .query(func.count()) + .query(func.sum(TaskInstance.pool_capacity)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state == State.QUEUED) .scalar() - ) + ) or 0 @provide_session def open_slots(self, session): diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index d8d47d1694157..8ff161c36660c 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -154,6 +154,7 @@ class TaskInstance(Base, LoggingMixin): unixname = Column(String(1000)) job_id = Column(Integer) pool = Column(String(50), nullable=False) + pool_capacity = Column(Integer, nullable=False, default=1) queue = Column(String(256)) priority_weight = Column(Integer) operator = Column(String(1000)) @@ -194,6 +195,8 @@ def __init__(self, task, execution_date, state=None): self.queue = task.queue self.pool = task.pool + if hasattr(task, 'pool_capacity'): + self.pool_capacity = task.pool_capacity self.priority_weight = task.priority_weight_total self.try_number = 0 self.max_tries = self.task.retries @@ -458,6 +461,7 @@ def refresh_from_db(self, session=None, lock_for_update=False, refresh_executor_ self.unixname = ti.unixname self.job_id = ti.job_id self.pool = ti.pool + self.pool_capacity = ti.pool_capacity self.queue = ti.queue self.priority_weight = ti.priority_weight self.operator = ti.operator @@ -770,6 +774,8 @@ def _check_and_change_state_before_execution( """ task = self.task self.pool = pool or task.pool + if hasattr(task, 'pool_capacity'): + self.pool_capacity = task.pool_capacity self.test_mode = test_mode self.refresh_from_db(session=session, lock_for_update=True) self.job_id = job_id @@ -885,6 +891,8 @@ def _run_raw_task( task = self.task self.pool = pool or task.pool + if hasattr(task, 'pool_capacity'): + self.pool_capacity = task.pool_capacity self.test_mode = test_mode self.refresh_from_db(session=session) self.job_id = job_id diff --git a/airflow/ti_deps/deps/pool_slots_available_dep.py b/airflow/ti_deps/deps/pool_slots_available_dep.py index e88f04f83fc65..e6f89c426ca2f 100644 --- a/airflow/ti_deps/deps/pool_slots_available_dep.py +++ b/airflow/ti_deps/deps/pool_slots_available_dep.py @@ -62,9 +62,9 @@ def _get_dep_statuses(self, ti, session, dep_context=None): open_slots = pools[0].open_slots() if ti.state in STATES_TO_COUNT_AS_RUNNING: - open_slots += 1 + open_slots += ti.pool_capacity - if open_slots <= 0: + if open_slots <= (ti.pool_capacity - 1): yield self._failing_status( reason=("Not scheduling since there are %s open slots in pool %s", open_slots, pool_name) diff --git a/tests/ti_deps/deps/test_pool_slots_available_dep.py b/tests/ti_deps/deps/test_pool_slots_available_dep.py index 74f277ecb9abc..271df584266ab 100644 --- a/tests/ti_deps/deps/test_pool_slots_available_dep.py +++ b/tests/ti_deps/deps/test_pool_slots_available_dep.py @@ -40,22 +40,22 @@ def tearDown(self): @patch('airflow.models.Pool.open_slots', return_value=0) # pylint: disable=unused-argument def test_pooled_task_reached_concurrency(self, mock_open_slots): - ti = Mock(pool='test_pool') + ti = Mock(pool='test_pool', pool_capacity=1) self.assertFalse(PoolSlotsAvailableDep().is_met(ti=ti)) @patch('airflow.models.Pool.open_slots', return_value=1) # pylint: disable=unused-argument def test_pooled_task_pass(self, mock_open_slots): - ti = Mock(pool='test_pool') + ti = Mock(pool='test_pool', pool_capacity=1) self.assertTrue(PoolSlotsAvailableDep().is_met(ti=ti)) @patch('airflow.models.Pool.open_slots', return_value=0) # pylint: disable=unused-argument def test_running_pooled_task_pass(self, mock_open_slots): for state in STATES_TO_COUNT_AS_RUNNING: - ti = Mock(pool='test_pool', state=state) + ti = Mock(pool='test_pool', state=state, pool_capacity=1) self.assertTrue(PoolSlotsAvailableDep().is_met(ti=ti)) def test_task_with_nonexistent_pool(self): - ti = Mock(pool='nonexistent_pool') + ti = Mock(pool='nonexistent_pool', pool_capacity=1) self.assertFalse(PoolSlotsAvailableDep().is_met(ti=ti)) diff --git a/tests/www_rbac/_trial_temp/_trial_marker b/tests/www_rbac/_trial_temp/_trial_marker new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/www_rbac/_trial_temp/test.log b/tests/www_rbac/_trial_temp/test.log new file mode 100644 index 0000000000000..dbc05f0e1b861 --- /dev/null +++ b/tests/www_rbac/_trial_temp/test.log @@ -0,0 +1 @@ +2019-12-30 15:37:00+0530 [-] Log opened. From 796c5bb5f8c664bd4affa618b00e4eb69c7549ea Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 31 Dec 2019 13:52:33 +0530 Subject: [PATCH 02/18] Added pool_capacity column migration script --- ...dd_pool_capacity_field_to_task_instance.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py diff --git a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py new file mode 100644 index 0000000000000..2a2a0b91a4f57 --- /dev/null +++ b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""add pool_capacity field to task_instance + +Revision ID: a4c2fd67d16b +Revises: fe461863935f +Create Date: 2019-12-31 13:50:02.227835 + +""" + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision = 'a4c2fd67d16b' +down_revision = 'fe461863935f' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('task_instance', sa.Column('pool_capacity', sa.Integer)) + + +def downgrade(): + op.drop_column('task_instance', 'pool_capacity') From 620cb554ff58ca236c76a55dc55e3c767fcc7c35 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 31 Dec 2019 13:57:24 +0530 Subject: [PATCH 03/18] removed test checkedin file --- tests/www_rbac/_trial_temp/_trial_marker | 0 tests/www_rbac/_trial_temp/test.log | 1 - 2 files changed, 1 deletion(-) delete mode 100644 tests/www_rbac/_trial_temp/_trial_marker delete mode 100644 tests/www_rbac/_trial_temp/test.log diff --git a/tests/www_rbac/_trial_temp/_trial_marker b/tests/www_rbac/_trial_temp/_trial_marker deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/tests/www_rbac/_trial_temp/test.log b/tests/www_rbac/_trial_temp/test.log deleted file mode 100644 index dbc05f0e1b861..0000000000000 --- a/tests/www_rbac/_trial_temp/test.log +++ /dev/null @@ -1 +0,0 @@ -2019-12-30 15:37:00+0530 [-] Log opened. From 7803551ecf87844876d1fc8c7b40aefdf2d18452 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 31 Dec 2019 14:35:14 +0530 Subject: [PATCH 04/18] removed extra space --- .../a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py index 2a2a0b91a4f57..f0c5f4a6bf456 100644 --- a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py +++ b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py @@ -27,7 +27,6 @@ import sqlalchemy as sa from alembic import op - # revision identifiers, used by Alembic. revision = 'a4c2fd67d16b' down_revision = 'fe461863935f' From b7ee7f84a4d032f1cb1bd727b157afd45117d09c Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 31 Dec 2019 15:56:48 +0530 Subject: [PATCH 05/18] correct test_database_schema_and_sqlalchemy_model_are_in_sync test case --- .../a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py | 2 +- airflow/models/taskinstance.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py index f0c5f4a6bf456..2e2130cc90f41 100644 --- a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py +++ b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py @@ -35,7 +35,7 @@ def upgrade(): - op.add_column('task_instance', sa.Column('pool_capacity', sa.Integer)) + op.add_column('task_instance', sa.Column('pool_capacity', sa.Integer, default=1)) def downgrade(): diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 8ff161c36660c..c897736ab9f63 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -154,7 +154,7 @@ class TaskInstance(Base, LoggingMixin): unixname = Column(String(1000)) job_id = Column(Integer) pool = Column(String(50), nullable=False) - pool_capacity = Column(Integer, nullable=False, default=1) + pool_capacity = Column('pool_capacity', Integer, default=1) queue = Column(String(256)) priority_weight = Column(Integer) operator = Column(String(1000)) From cc0f340d654ae65a2e437ef6d41394ea6784cda2 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 31 Dec 2019 17:52:01 +0530 Subject: [PATCH 06/18] Added description for pool_capacity property for task instance --- airflow/models/baseoperator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 7ad823aa861f0..de6f7943f50fa 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -178,6 +178,8 @@ class derived from this one results in the creation of a task object, :param pool: the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks :type pool: str + :param pool_capacity: the number of pool slots this task should use + :type pool_capacity: int :param sla: time by which the job is expected to succeed. Note that this represents the ``timedelta`` after the period is closed. For example if you set an SLA of 1 hour, the scheduler would send an email From df17cbc46716cfde7f38384e5ddf901bf6f572f1 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Thu, 2 Jan 2020 16:37:13 +0530 Subject: [PATCH 07/18] Modified test cases to include pool_capacity along with pool in task instances --- tests/models/test_taskinstance.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index cab4715610212..bfb173d7797f3 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -302,6 +302,7 @@ def test_not_requeue_non_requeueable_task_instance(self): task_id='test_not_requeue_non_requeueable_task_instance_op', dag=dag, pool='test_pool', + pool_capacity=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -344,6 +345,7 @@ def test_mark_non_runnable_task_as_success(self): task_id='test_mark_non_runnable_task_as_success_op', dag=dag, pool='test_pool', + pool_capacity=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -361,7 +363,7 @@ def test_run_pooling_task(self): """ dag = models.DAG(dag_id='test_run_pooling_task') task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, - pool='test_pool', owner='airflow', + pool='test_pool', pool_capacity=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( task=task, execution_date=timezone.utcnow()) @@ -407,6 +409,7 @@ def test_run_pooling_task_with_mark_success(self): task_id='test_run_pooling_task_with_mark_success_op', dag=dag, pool='test_pool', + pool_capacity=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -619,6 +622,7 @@ def func(): dag=dag, owner='airflow', pool='test_pool', + pool_capacity=1, start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI(task=task, execution_date=timezone.utcnow()) @@ -715,6 +719,7 @@ def func(): dag=dag, owner='airflow', pool='test_pool', + pool_capacity=1, start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI(task=task, execution_date=timezone.utcnow()) @@ -908,6 +913,7 @@ def test_xcom_pull_after_success(self): task_id='test_xcom', dag=dag, pool='test_xcom', + pool_capacity=1, owner='airflow', start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) exec_date = timezone.utcnow() @@ -942,6 +948,7 @@ def test_xcom_pull_different_execution_date(self): task_id='test_xcom', dag=dag, pool='test_xcom', + pool_capacity=1, owner='airflow', start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) exec_date = timezone.utcnow() From 59d76628ee48523e246d8ad97d229ea160218d8b Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Thu, 2 Jan 2020 16:41:35 +0530 Subject: [PATCH 08/18] Modified test cases to include pool_capacity along with pool in task instances --- tests/models/test_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py index 21ef645d059d6..babd8ff392923 100644 --- a/tests/models/test_pool.py +++ b/tests/models/test_pool.py @@ -97,7 +97,7 @@ def test_default_pool_open_slots(self): dag_id='test_default_pool_open_slots', start_date=DEFAULT_DATE, ) op1 = DummyOperator(task_id='dummy1', dag=dag) - op2 = DummyOperator(task_id='dummy2', dag=dag) + op2 = DummyOperator(task_id='dummy2', dag=dag, pool_capacity=2) ti1 = TI(task=op1, execution_date=DEFAULT_DATE) ti2 = TI(task=op2, execution_date=DEFAULT_DATE) ti1.state = State.RUNNING @@ -109,4 +109,4 @@ def test_default_pool_open_slots(self): session.commit() session.close() - self.assertEqual(3, Pool.get_default_pool().open_slots()) + self.assertEqual(2, Pool.get_default_pool().open_slots()) From 157aed6dccc8e41639d27a4bb03fa455989a6a0f Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 7 Jan 2020 13:49:02 +0530 Subject: [PATCH 09/18] Removed Column.name property, since property value is same as actual variable --- airflow/models/taskinstance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index c897736ab9f63..2e7453c8b4034 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -154,7 +154,7 @@ class TaskInstance(Base, LoggingMixin): unixname = Column(String(1000)) job_id = Column(Integer) pool = Column(String(50), nullable=False) - pool_capacity = Column('pool_capacity', Integer, default=1) + pool_capacity = Column(Integer, default=1) queue = Column(String(256)) priority_weight = Column(Integer) operator = Column(String(1000)) From 10fb0046c2bc89f77dda6fb7474432a164ef9ff0 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 7 Jan 2020 15:15:55 +0530 Subject: [PATCH 10/18] check for pool_capacity property to be always >= 1 --- airflow/models/baseoperator.py | 3 ++- airflow/models/taskinstance.py | 3 +++ tests/models/test_taskinstance.py | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index de6f7943f50fa..795e1c27fd08f 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -178,7 +178,8 @@ class derived from this one results in the creation of a task object, :param pool: the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks :type pool: str - :param pool_capacity: the number of pool slots this task should use + :param pool_capacity: the number of pool slots this task should use (>= 1) + Values less than 1 are not allowed. :type pool_capacity: int :param sla: time by which the job is expected to succeed. Note that this represents the ``timedelta`` after the period is closed. For diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 2e7453c8b4034..f6b8cdd0374c2 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -197,6 +197,9 @@ def __init__(self, task, execution_date, state=None): self.pool = task.pool if hasattr(task, 'pool_capacity'): self.pool_capacity = task.pool_capacity + if task.pool_capacity < 1: + raise AirflowException("pool_capacity for %s in dag %s cannot be less than 1" + % (task.task_id, task.dag_id)) self.priority_weight = task.priority_weight_total self.try_number = 0 self.max_tries = self.task.retries diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index bfb173d7797f3..9c069f6ebb359 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -372,6 +372,20 @@ def test_run_pooling_task(self): db.clear_db_pools() self.assertEqual(ti.state, State.SUCCESS) + def test_pool_capacity_property(self): + """ + test that try to create a task with pool_capacity less than 1 + """ + def create_task_instance(): + dag = models.DAG(dag_id='test_run_pooling_task') + task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, + pool='test_pool', pool_capacity=0, owner='airflow', + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) + ti = TI( + task=task, execution_date=timezone.utcnow()) + + self.assertRaises(AirflowException, create_task_instance) + @provide_session def test_ti_updates_with_task(self, session=None): """ From 5555b1d15c826c620ed56c43c2b70ca93ebc1439 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 7 Jan 2020 16:11:32 +0530 Subject: [PATCH 11/18] removed unused variable ti --- tests/models/test_taskinstance.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 9c069f6ebb359..c3d5dda2a982c 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -381,8 +381,7 @@ def create_task_instance(): task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, pool='test_pool', pool_capacity=0, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) - ti = TI( - task=task, execution_date=timezone.utcnow()) + return TI(task=task, execution_date=timezone.utcnow()) self.assertRaises(AirflowException, create_task_instance) From f47f23946ee228d9f6f9ce23dc146659f8f9051a Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 7 Jan 2020 22:46:29 +0530 Subject: [PATCH 12/18] modified test cases for pool_capacity --- tests/test_sentry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_sentry.py b/tests/test_sentry.py index 9c0cd076e31e4..a77fcb4d43712 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -70,7 +70,7 @@ def setUp(self): self.dag.task_ids = [TASK_ID] # Mock the task - self.task = Mock(dag=self.dag, dag_id=DAG_ID, task_id=TASK_ID, params=[]) + self.task = Mock(dag=self.dag, dag_id=DAG_ID, task_id=TASK_ID, params=[], pool_capacity=1) self.task.__class__.__name__ = OPERATOR self.ti = TaskInstance(self.task, execution_date=EXECUTION_DATE) From ca3b1c24dee59fa3d8ded650c68f988d8e5e6e09 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 7 Jan 2020 23:10:01 +0530 Subject: [PATCH 13/18] modified test cases for pool_capacity --- tests/ti_deps/deps/test_dag_ti_slots_available_dep.py | 4 ++-- tests/ti_deps/deps/test_dag_unpaused_dep.py | 4 ++-- tests/ti_deps/deps/test_not_in_retry_period_dep.py | 2 +- tests/ti_deps/deps/test_ready_to_reschedule_dep.py | 4 ++-- tests/ti_deps/deps/test_runnable_exec_date_dep.py | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py index 0c03baa603430..891d44474d289 100644 --- a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py +++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py @@ -31,7 +31,7 @@ def test_concurrency_reached(self): Test concurrency reached should fail dep """ dag = Mock(concurrency=1, concurrency_reached=True) - task = Mock(dag=dag) + task = Mock(dag=dag, pool_capacity=1) ti = TaskInstance(task, execution_date=None) self.assertFalse(DagTISlotsAvailableDep().is_met(ti=ti)) @@ -41,7 +41,7 @@ def test_all_conditions_met(self): Test all conditions met should pass dep """ dag = Mock(concurrency=1, concurrency_reached=False) - task = Mock(dag=dag) + task = Mock(dag=dag, pool_capacity=1) ti = TaskInstance(task, execution_date=None) self.assertTrue(DagTISlotsAvailableDep().is_met(ti=ti)) diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py b/tests/ti_deps/deps/test_dag_unpaused_dep.py index 6bb4266ceb7d6..312920116552c 100644 --- a/tests/ti_deps/deps/test_dag_unpaused_dep.py +++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py @@ -31,7 +31,7 @@ def test_concurrency_reached(self): Test paused DAG should fail dependency """ dag = Mock(is_paused=True) - task = Mock(dag=dag) + task = Mock(dag=dag, pool_capacity=1) ti = TaskInstance(task=task, execution_date=None) self.assertFalse(DagUnpausedDep().is_met(ti=ti)) @@ -41,7 +41,7 @@ def test_all_conditions_met(self): Test all conditions met should pass dep """ dag = Mock(is_paused=False) - task = Mock(dag=dag) + task = Mock(dag=dag, pool_capacity=1) ti = TaskInstance(task=task, execution_date=None) self.assertTrue(DagUnpausedDep().is_met(ti=ti)) diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py index de1f574327e7c..d0c46b9243005 100644 --- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py +++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py @@ -33,7 +33,7 @@ class TestNotInRetryPeriodDep(unittest.TestCase): def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)): - task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False) + task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False, pool_capacity=1) ti = TaskInstance(task=task, state=state, execution_date=None) ti.end_date = end_date return ti diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py index 7bc48e533d2dc..05dc789dbb1c9 100644 --- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py +++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py @@ -32,12 +32,12 @@ class TestNotInReschedulePeriodDep(unittest.TestCase): def _get_task_instance(self, state): dag = DAG('test_dag') - task = Mock(dag=dag) + task = Mock(dag=dag, pool_capacity=1) ti = TaskInstance(task=task, state=state, execution_date=None) return ti def _get_task_reschedule(self, reschedule_date): - task = Mock(dag_id='test_dag', task_id='test_task') + task = Mock(dag_id='test_dag', task_id='test_task', pool_capacity=1) reschedule = TaskReschedule(task=task, execution_date=None, try_number=None, start_date=reschedule_date, end_date=reschedule_date, reschedule_date=reschedule_date) diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py index 796161b2c1e02..bcf6348b0f6e0 100644 --- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py +++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py @@ -31,7 +31,7 @@ class TestRunnableExecDateDep(unittest.TestCase): def _get_task_instance(self, execution_date, dag_end_date=None, task_end_date=None): dag = Mock(end_date=dag_end_date) - task = Mock(dag=dag, end_date=task_end_date) + task = Mock(dag=dag, end_date=task_end_date, pool_capacity=1) return TaskInstance(task=task, execution_date=execution_date) @freeze_time('2016-01-01') From 4c40b0c72f2fea1b820c789f75d9f19ee5b9d964 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 14 Jan 2020 09:06:49 +0530 Subject: [PATCH 14/18] created single head for sql migration scripts --- ...a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py index 2e2130cc90f41..c3ccc5a1e18b0 100644 --- a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py +++ b/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py @@ -19,8 +19,8 @@ """add pool_capacity field to task_instance Revision ID: a4c2fd67d16b -Revises: fe461863935f -Create Date: 2019-12-31 13:50:02.227835 +Revises: 7939bcff74ba +Create Date: 2020-01-14 03:35:01.161519 """ @@ -29,7 +29,7 @@ # revision identifiers, used by Alembic. revision = 'a4c2fd67d16b' -down_revision = 'fe461863935f' +down_revision = '7939bcff74ba' branch_labels = None depends_on = None From c96f551d12bb73b1382dcd13c3476bcabf53398f Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 14 Jan 2020 21:04:23 +0530 Subject: [PATCH 15/18] * renamed the pool_capacity field to pool_slots * moved "pool_slots < 1" check to baseoperator * removed hasattr check in taskinstance * extended the message in pool_slot_available_dep * added pool_slots in schema.json --- ..._add_pool_slots_field_to_task_instance.py} | 6 ++--- airflow/models/baseoperator.py | 11 ++++++---- airflow/models/pool.py | 6 ++--- airflow/models/taskinstance.py | 16 +++++--------- airflow/serialization/schema.json | 1 + .../ti_deps/deps/pool_slots_available_dep.py | 9 ++++---- tests/models/test_pool.py | 2 +- tests/models/test_taskinstance.py | 22 +++++++++---------- tests/test_sentry.py | 2 +- .../deps/test_dag_ti_slots_available_dep.py | 4 ++-- tests/ti_deps/deps/test_dag_unpaused_dep.py | 4 ++-- .../deps/test_not_in_retry_period_dep.py | 2 +- .../deps/test_pool_slots_available_dep.py | 8 +++---- .../deps/test_ready_to_reschedule_dep.py | 4 ++-- .../deps/test_runnable_exec_date_dep.py | 2 +- 15 files changed, 49 insertions(+), 50 deletions(-) rename airflow/migrations/versions/{a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py => a4c2fd67d16b_add_pool_slots_field_to_task_instance.py} (85%) diff --git a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py b/airflow/migrations/versions/a4c2fd67d16b_add_pool_slots_field_to_task_instance.py similarity index 85% rename from airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py rename to airflow/migrations/versions/a4c2fd67d16b_add_pool_slots_field_to_task_instance.py index c3ccc5a1e18b0..4dd825e8b7de4 100644 --- a/airflow/migrations/versions/a4c2fd67d16b_add_pool_capacity_field_to_task_instance.py +++ b/airflow/migrations/versions/a4c2fd67d16b_add_pool_slots_field_to_task_instance.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. -"""add pool_capacity field to task_instance +"""add pool_slots field to task_instance Revision ID: a4c2fd67d16b Revises: 7939bcff74ba @@ -35,8 +35,8 @@ def upgrade(): - op.add_column('task_instance', sa.Column('pool_capacity', sa.Integer, default=1)) + op.add_column('task_instance', sa.Column('pool_slots', sa.Integer, default=1)) def downgrade(): - op.drop_column('task_instance', 'pool_capacity') + op.drop_column('task_instance', 'pool_slots') diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 795e1c27fd08f..390b493f04600 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -178,9 +178,9 @@ class derived from this one results in the creation of a task object, :param pool: the slot pool this task should run in, slot pools are a way to limit concurrency for certain tasks :type pool: str - :param pool_capacity: the number of pool slots this task should use (>= 1) + :param pool_slots: the number of pool slots this task should use (>= 1) Values less than 1 are not allowed. - :type pool_capacity: int + :type pool_slots: int :param sla: time by which the job is expected to succeed. Note that this represents the ``timedelta`` after the period is closed. For example if you set an SLA of 1 hour, the scheduler would send an email @@ -316,7 +316,7 @@ def __init__( weight_rule: str = WeightRule.DOWNSTREAM, queue: str = conf.get('celery', 'default_queue'), pool: str = Pool.DEFAULT_POOL_NAME, - pool_capacity: int = 1, + pool_slots: int = 1, sla: Optional[timedelta] = None, execution_timeout: Optional[timedelta] = None, on_execute_callback: Optional[Callable] = None, @@ -385,7 +385,10 @@ def __init__( self.retries = retries self.queue = queue self.pool = pool - self.pool_capacity = pool_capacity + self.pool_slots = pool_slots + if self.pool_slots < 1: + raise AirflowException("pool slots for %s in dag %s cannot be less than 1" + % (self.task_id, dag.dag_id)) self.sla = sla self.execution_timeout = execution_timeout self.on_execute_callback = on_execute_callback diff --git a/airflow/models/pool.py b/airflow/models/pool.py index 1b598f3af9f87..dde63e81f403e 100644 --- a/airflow/models/pool.py +++ b/airflow/models/pool.py @@ -65,7 +65,7 @@ def occupied_slots(self, session): from airflow.models.taskinstance import TaskInstance # Avoid circular import return ( session - .query(func.sum(TaskInstance.pool_capacity)) + .query(func.sum(TaskInstance.pool_slots)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state.in_(STATES_TO_COUNT_AS_RUNNING)) .scalar() @@ -80,7 +80,7 @@ def used_slots(self, session): running = ( session - .query(func.sum(TaskInstance.pool_capacity)) + .query(func.sum(TaskInstance.pool_slots)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state == State.RUNNING) .scalar() @@ -96,7 +96,7 @@ def queued_slots(self, session): return ( session - .query(func.sum(TaskInstance.pool_capacity)) + .query(func.sum(TaskInstance.pool_slots)) .filter(TaskInstance.pool == self.pool) .filter(TaskInstance.state == State.QUEUED) .scalar() diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index f6b8cdd0374c2..501bdc4a469a4 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -154,7 +154,7 @@ class TaskInstance(Base, LoggingMixin): unixname = Column(String(1000)) job_id = Column(Integer) pool = Column(String(50), nullable=False) - pool_capacity = Column(Integer, default=1) + pool_slots = Column(Integer, default=1) queue = Column(String(256)) priority_weight = Column(Integer) operator = Column(String(1000)) @@ -195,11 +195,7 @@ def __init__(self, task, execution_date, state=None): self.queue = task.queue self.pool = task.pool - if hasattr(task, 'pool_capacity'): - self.pool_capacity = task.pool_capacity - if task.pool_capacity < 1: - raise AirflowException("pool_capacity for %s in dag %s cannot be less than 1" - % (task.task_id, task.dag_id)) + self.pool_slots = task.pool_slots self.priority_weight = task.priority_weight_total self.try_number = 0 self.max_tries = self.task.retries @@ -464,7 +460,7 @@ def refresh_from_db(self, session=None, lock_for_update=False, refresh_executor_ self.unixname = ti.unixname self.job_id = ti.job_id self.pool = ti.pool - self.pool_capacity = ti.pool_capacity + self.pool_slots = ti.pool_slots self.queue = ti.queue self.priority_weight = ti.priority_weight self.operator = ti.operator @@ -777,8 +773,7 @@ def _check_and_change_state_before_execution( """ task = self.task self.pool = pool or task.pool - if hasattr(task, 'pool_capacity'): - self.pool_capacity = task.pool_capacity + self.pool_slots = task.pool_slots self.test_mode = test_mode self.refresh_from_db(session=session, lock_for_update=True) self.job_id = job_id @@ -894,8 +889,7 @@ def _run_raw_task( task = self.task self.pool = pool or task.pool - if hasattr(task, 'pool_capacity'): - self.pool_capacity = task.pool_capacity + self.pool_slots = task.pool_slots self.test_mode = test_mode self.refresh_from_db(session=session) self.job_id = job_id diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 3c7677f0b8e5d..cc510e2801015 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -134,6 +134,7 @@ "retries": { "type": "number" }, "queue": { "type": "string" }, "pool": { "type": "string" }, + "pool_slots": { "type": "string" }, "execution_timeout": { "$ref": "#/definitions/timedelta" }, "retry_delay": { "$ref": "#/definitions/timedelta" }, "retry_exponential_backoff": { "type": "boolean" }, diff --git a/airflow/ti_deps/deps/pool_slots_available_dep.py b/airflow/ti_deps/deps/pool_slots_available_dep.py index e6f89c426ca2f..3385881b4ad2c 100644 --- a/airflow/ti_deps/deps/pool_slots_available_dep.py +++ b/airflow/ti_deps/deps/pool_slots_available_dep.py @@ -62,12 +62,13 @@ def _get_dep_statuses(self, ti, session, dep_context=None): open_slots = pools[0].open_slots() if ti.state in STATES_TO_COUNT_AS_RUNNING: - open_slots += ti.pool_capacity + open_slots += ti.pool_slots - if open_slots <= (ti.pool_capacity - 1): + if open_slots <= (ti.pool_slots - 1): yield self._failing_status( - reason=("Not scheduling since there are %s open slots in pool %s", - open_slots, pool_name) + reason=("Not scheduling since there are %s open slots in pool %s " + "and require %s pool slots", + open_slots, pool_name, ti.pool_slots) ) else: yield self._passing_status( diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py index babd8ff392923..083f61e16a13d 100644 --- a/tests/models/test_pool.py +++ b/tests/models/test_pool.py @@ -97,7 +97,7 @@ def test_default_pool_open_slots(self): dag_id='test_default_pool_open_slots', start_date=DEFAULT_DATE, ) op1 = DummyOperator(task_id='dummy1', dag=dag) - op2 = DummyOperator(task_id='dummy2', dag=dag, pool_capacity=2) + op2 = DummyOperator(task_id='dummy2', dag=dag, pool_slots=2) ti1 = TI(task=op1, execution_date=DEFAULT_DATE) ti2 = TI(task=op2, execution_date=DEFAULT_DATE) ti1.state = State.RUNNING diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index c3d5dda2a982c..2f14b84a20527 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -302,7 +302,7 @@ def test_not_requeue_non_requeueable_task_instance(self): task_id='test_not_requeue_non_requeueable_task_instance_op', dag=dag, pool='test_pool', - pool_capacity=1, + pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -345,7 +345,7 @@ def test_mark_non_runnable_task_as_success(self): task_id='test_mark_non_runnable_task_as_success_op', dag=dag, pool='test_pool', - pool_capacity=1, + pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -363,7 +363,7 @@ def test_run_pooling_task(self): """ dag = models.DAG(dag_id='test_run_pooling_task') task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, - pool='test_pool', pool_capacity=1, owner='airflow', + pool='test_pool', pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( task=task, execution_date=timezone.utcnow()) @@ -372,14 +372,14 @@ def test_run_pooling_task(self): db.clear_db_pools() self.assertEqual(ti.state, State.SUCCESS) - def test_pool_capacity_property(self): + def test_pool_slots_property(self): """ - test that try to create a task with pool_capacity less than 1 + test that try to create a task with pool_slots less than 1 """ def create_task_instance(): dag = models.DAG(dag_id='test_run_pooling_task') task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, - pool='test_pool', pool_capacity=0, owner='airflow', + pool='test_pool', pool_slots=0, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) return TI(task=task, execution_date=timezone.utcnow()) @@ -422,7 +422,7 @@ def test_run_pooling_task_with_mark_success(self): task_id='test_run_pooling_task_with_mark_success_op', dag=dag, pool='test_pool', - pool_capacity=1, + pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -635,7 +635,7 @@ def func(): dag=dag, owner='airflow', pool='test_pool', - pool_capacity=1, + pool_slots=1, start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI(task=task, execution_date=timezone.utcnow()) @@ -732,7 +732,7 @@ def func(): dag=dag, owner='airflow', pool='test_pool', - pool_capacity=1, + pool_slots=1, start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI(task=task, execution_date=timezone.utcnow()) @@ -926,7 +926,7 @@ def test_xcom_pull_after_success(self): task_id='test_xcom', dag=dag, pool='test_xcom', - pool_capacity=1, + pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) exec_date = timezone.utcnow() @@ -961,7 +961,7 @@ def test_xcom_pull_different_execution_date(self): task_id='test_xcom', dag=dag, pool='test_xcom', - pool_capacity=1, + pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) exec_date = timezone.utcnow() diff --git a/tests/test_sentry.py b/tests/test_sentry.py index a77fcb4d43712..5e341b58e27c7 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -70,7 +70,7 @@ def setUp(self): self.dag.task_ids = [TASK_ID] # Mock the task - self.task = Mock(dag=self.dag, dag_id=DAG_ID, task_id=TASK_ID, params=[], pool_capacity=1) + self.task = Mock(dag=self.dag, dag_id=DAG_ID, task_id=TASK_ID, params=[], pool_slots=1) self.task.__class__.__name__ = OPERATOR self.ti = TaskInstance(self.task, execution_date=EXECUTION_DATE) diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py index 891d44474d289..5ff628a1b0d98 100644 --- a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py +++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py @@ -31,7 +31,7 @@ def test_concurrency_reached(self): Test concurrency reached should fail dep """ dag = Mock(concurrency=1, concurrency_reached=True) - task = Mock(dag=dag, pool_capacity=1) + task = Mock(dag=dag, pool_slots=1) ti = TaskInstance(task, execution_date=None) self.assertFalse(DagTISlotsAvailableDep().is_met(ti=ti)) @@ -41,7 +41,7 @@ def test_all_conditions_met(self): Test all conditions met should pass dep """ dag = Mock(concurrency=1, concurrency_reached=False) - task = Mock(dag=dag, pool_capacity=1) + task = Mock(dag=dag, pool_slots=1) ti = TaskInstance(task, execution_date=None) self.assertTrue(DagTISlotsAvailableDep().is_met(ti=ti)) diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py b/tests/ti_deps/deps/test_dag_unpaused_dep.py index 312920116552c..0b87c754f666e 100644 --- a/tests/ti_deps/deps/test_dag_unpaused_dep.py +++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py @@ -31,7 +31,7 @@ def test_concurrency_reached(self): Test paused DAG should fail dependency """ dag = Mock(is_paused=True) - task = Mock(dag=dag, pool_capacity=1) + task = Mock(dag=dag, pool_slots=1) ti = TaskInstance(task=task, execution_date=None) self.assertFalse(DagUnpausedDep().is_met(ti=ti)) @@ -41,7 +41,7 @@ def test_all_conditions_met(self): Test all conditions met should pass dep """ dag = Mock(is_paused=False) - task = Mock(dag=dag, pool_capacity=1) + task = Mock(dag=dag, pool_slots=1) ti = TaskInstance(task=task, execution_date=None) self.assertTrue(DagUnpausedDep().is_met(ti=ti)) diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py index d0c46b9243005..7e5ca58e2f028 100644 --- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py +++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py @@ -33,7 +33,7 @@ class TestNotInRetryPeriodDep(unittest.TestCase): def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)): - task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False, pool_capacity=1) + task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False, pool_slots=1) ti = TaskInstance(task=task, state=state, execution_date=None) ti.end_date = end_date return ti diff --git a/tests/ti_deps/deps/test_pool_slots_available_dep.py b/tests/ti_deps/deps/test_pool_slots_available_dep.py index 271df584266ab..74c854858e6f9 100644 --- a/tests/ti_deps/deps/test_pool_slots_available_dep.py +++ b/tests/ti_deps/deps/test_pool_slots_available_dep.py @@ -40,22 +40,22 @@ def tearDown(self): @patch('airflow.models.Pool.open_slots', return_value=0) # pylint: disable=unused-argument def test_pooled_task_reached_concurrency(self, mock_open_slots): - ti = Mock(pool='test_pool', pool_capacity=1) + ti = Mock(pool='test_pool', pool_slots=1) self.assertFalse(PoolSlotsAvailableDep().is_met(ti=ti)) @patch('airflow.models.Pool.open_slots', return_value=1) # pylint: disable=unused-argument def test_pooled_task_pass(self, mock_open_slots): - ti = Mock(pool='test_pool', pool_capacity=1) + ti = Mock(pool='test_pool', pool_slots=1) self.assertTrue(PoolSlotsAvailableDep().is_met(ti=ti)) @patch('airflow.models.Pool.open_slots', return_value=0) # pylint: disable=unused-argument def test_running_pooled_task_pass(self, mock_open_slots): for state in STATES_TO_COUNT_AS_RUNNING: - ti = Mock(pool='test_pool', state=state, pool_capacity=1) + ti = Mock(pool='test_pool', state=state, pool_slots=1) self.assertTrue(PoolSlotsAvailableDep().is_met(ti=ti)) def test_task_with_nonexistent_pool(self): - ti = Mock(pool='nonexistent_pool', pool_capacity=1) + ti = Mock(pool='nonexistent_pool', pool_slots=1) self.assertFalse(PoolSlotsAvailableDep().is_met(ti=ti)) diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py index 05dc789dbb1c9..4b9f0bd3f3a34 100644 --- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py +++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py @@ -32,12 +32,12 @@ class TestNotInReschedulePeriodDep(unittest.TestCase): def _get_task_instance(self, state): dag = DAG('test_dag') - task = Mock(dag=dag, pool_capacity=1) + task = Mock(dag=dag, pool_slots=1) ti = TaskInstance(task=task, state=state, execution_date=None) return ti def _get_task_reschedule(self, reschedule_date): - task = Mock(dag_id='test_dag', task_id='test_task', pool_capacity=1) + task = Mock(dag_id='test_dag', task_id='test_task', pool_slots=1) reschedule = TaskReschedule(task=task, execution_date=None, try_number=None, start_date=reschedule_date, end_date=reschedule_date, reschedule_date=reschedule_date) diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py index bcf6348b0f6e0..ccc39751b5fff 100644 --- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py +++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py @@ -31,7 +31,7 @@ class TestRunnableExecDateDep(unittest.TestCase): def _get_task_instance(self, execution_date, dag_end_date=None, task_end_date=None): dag = Mock(end_date=dag_end_date) - task = Mock(dag=dag, end_date=task_end_date, pool_capacity=1) + task = Mock(dag=dag, end_date=task_end_date, pool_slots=1) return TaskInstance(task=task, execution_date=execution_date) @freeze_time('2016-01-01') From cea26369ca884996ed2dfec95889f0265dfe74a7 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Tue, 14 Jan 2020 21:28:16 +0530 Subject: [PATCH 16/18] * removed pool_slots from test cases wherever not required --- tests/models/test_taskinstance.py | 9 +-------- tests/ti_deps/deps/test_dag_unpaused_dep.py | 4 ++-- tests/ti_deps/deps/test_not_in_retry_period_dep.py | 2 +- tests/ti_deps/deps/test_ready_to_reschedule_dep.py | 4 ++-- tests/ti_deps/deps/test_runnable_exec_date_dep.py | 2 +- 5 files changed, 7 insertions(+), 14 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 2f14b84a20527..a5bdbe0a763b0 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -302,7 +302,6 @@ def test_not_requeue_non_requeueable_task_instance(self): task_id='test_not_requeue_non_requeueable_task_instance_op', dag=dag, pool='test_pool', - pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -345,7 +344,6 @@ def test_mark_non_runnable_task_as_success(self): task_id='test_mark_non_runnable_task_as_success_op', dag=dag, pool='test_pool', - pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -363,7 +361,7 @@ def test_run_pooling_task(self): """ dag = models.DAG(dag_id='test_run_pooling_task') task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag, - pool='test_pool', pool_slots=1, owner='airflow', + pool='test_pool', owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( task=task, execution_date=timezone.utcnow()) @@ -422,7 +420,6 @@ def test_run_pooling_task_with_mark_success(self): task_id='test_run_pooling_task_with_mark_success_op', dag=dag, pool='test_pool', - pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI( @@ -635,7 +632,6 @@ def func(): dag=dag, owner='airflow', pool='test_pool', - pool_slots=1, start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI(task=task, execution_date=timezone.utcnow()) @@ -732,7 +728,6 @@ def func(): dag=dag, owner='airflow', pool='test_pool', - pool_slots=1, start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) ti = TI(task=task, execution_date=timezone.utcnow()) @@ -926,7 +921,6 @@ def test_xcom_pull_after_success(self): task_id='test_xcom', dag=dag, pool='test_xcom', - pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) exec_date = timezone.utcnow() @@ -961,7 +955,6 @@ def test_xcom_pull_different_execution_date(self): task_id='test_xcom', dag=dag, pool='test_xcom', - pool_slots=1, owner='airflow', start_date=timezone.datetime(2016, 6, 2, 0, 0, 0)) exec_date = timezone.utcnow() diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py b/tests/ti_deps/deps/test_dag_unpaused_dep.py index 0b87c754f666e..6bb4266ceb7d6 100644 --- a/tests/ti_deps/deps/test_dag_unpaused_dep.py +++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py @@ -31,7 +31,7 @@ def test_concurrency_reached(self): Test paused DAG should fail dependency """ dag = Mock(is_paused=True) - task = Mock(dag=dag, pool_slots=1) + task = Mock(dag=dag) ti = TaskInstance(task=task, execution_date=None) self.assertFalse(DagUnpausedDep().is_met(ti=ti)) @@ -41,7 +41,7 @@ def test_all_conditions_met(self): Test all conditions met should pass dep """ dag = Mock(is_paused=False) - task = Mock(dag=dag, pool_slots=1) + task = Mock(dag=dag) ti = TaskInstance(task=task, execution_date=None) self.assertTrue(DagUnpausedDep().is_met(ti=ti)) diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py index 7e5ca58e2f028..de1f574327e7c 100644 --- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py +++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py @@ -33,7 +33,7 @@ class TestNotInRetryPeriodDep(unittest.TestCase): def _get_task_instance(self, state, end_date=None, retry_delay=timedelta(minutes=15)): - task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False, pool_slots=1) + task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False) ti = TaskInstance(task=task, state=state, execution_date=None) ti.end_date = end_date return ti diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py index 4b9f0bd3f3a34..7bc48e533d2dc 100644 --- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py +++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py @@ -32,12 +32,12 @@ class TestNotInReschedulePeriodDep(unittest.TestCase): def _get_task_instance(self, state): dag = DAG('test_dag') - task = Mock(dag=dag, pool_slots=1) + task = Mock(dag=dag) ti = TaskInstance(task=task, state=state, execution_date=None) return ti def _get_task_reschedule(self, reschedule_date): - task = Mock(dag_id='test_dag', task_id='test_task', pool_slots=1) + task = Mock(dag_id='test_dag', task_id='test_task') reschedule = TaskReschedule(task=task, execution_date=None, try_number=None, start_date=reschedule_date, end_date=reschedule_date, reschedule_date=reschedule_date) diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py index ccc39751b5fff..796161b2c1e02 100644 --- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py +++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py @@ -31,7 +31,7 @@ class TestRunnableExecDateDep(unittest.TestCase): def _get_task_instance(self, execution_date, dag_end_date=None, task_end_date=None): dag = Mock(end_date=dag_end_date) - task = Mock(dag=dag, end_date=task_end_date, pool_slots=1) + task = Mock(dag=dag, end_date=task_end_date) return TaskInstance(task=task, execution_date=execution_date) @freeze_time('2016-01-01') From 3a24c086a87414751cb82e4c9a8dd03f190a4853 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Thu, 16 Jan 2020 19:54:13 +0530 Subject: [PATCH 17/18] Corrected the type for pool_slots from string -> number --- airflow/serialization/schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index cc510e2801015..49de949677e8d 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -134,7 +134,7 @@ "retries": { "type": "number" }, "queue": { "type": "string" }, "pool": { "type": "string" }, - "pool_slots": { "type": "string" }, + "pool_slots": { "type": "number" }, "execution_timeout": { "$ref": "#/definitions/timedelta" }, "retry_delay": { "$ref": "#/definitions/timedelta" }, "retry_exponential_backoff": { "type": "boolean" }, From 3a5351d8f19820d278c20f9fd6a7ed89beea93e8 Mon Sep 17 00:00:00 2001 From: Lokesh Lal Date: Fri, 17 Jan 2020 19:08:54 +0530 Subject: [PATCH 18/18] modified baseoperator constructor new field added test case --- tests/serialization/test_dag_serialization.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index b245a1128751e..424238ed23b61 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -574,6 +574,7 @@ def test_no_new_fields_added_to_base_operator(self): 'owner': 'airflow', 'params': {}, 'pool': 'default_pool', + 'pool_slots': 1, 'priority_weight': 1, 'queue': 'default', 'resources': None,