Skip to content

Conversation

@lokeshlal
Copy link
Contributor

@lokeshlal lokeshlal commented Dec 31, 2019

PR contains changes in pool and task instance to provide functionality to tasks to use more than one pool slot.

  • Added pool_capacity field in TaskInstance (pool_capacity is defaulted to 1, to maintain the current behavior)
  • Added pool_capacity in baseoperator
  • Modified pools functionality to calculate the used/queued/occupied/available slots
  • Modified pool_slots_available_dep.py to check against task pool_capacity field instead of 1
  • Modifed test case in file test_pool_slots_available_dep.py to include pool_capacity in the Mock object

@tooptoop4
Copy link
Contributor

https://issues.apache.org/jira/browse/AIRFLOW-6227 could be another approach

@codecov-io
Copy link

codecov-io commented Dec 31, 2019

Codecov Report

Merging #6975 into master will increase coverage by 0.08%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #6975      +/-   ##
=========================================
+ Coverage   84.81%   84.9%   +0.08%     
=========================================
  Files         679     680       +1     
  Lines       38491   38903     +412     
=========================================
+ Hits        32648   33032     +384     
- Misses       5843    5871      +28
Impacted Files Coverage Δ
airflow/models/baseoperator.py 96.26% <ø> (+0.18%) ⬆️
airflow/models/pool.py 97.36% <ø> (ø) ⬆️
airflow/models/taskinstance.py 95% <100%> (+0.08%) ⬆️
airflow/ti_deps/deps/pool_slots_available_dep.py 100% <100%> (ø) ⬆️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 45.25% <0%> (-46.72%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0%> (-23.53%) ⬇️
...rflow/contrib/operators/kubernetes_pod_operator.py 78.75% <0%> (-20%) ⬇️
airflow/utils/file.py 86.3% <0%> (-5.17%) ⬇️
... and 97 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update da2a617...a51783b. Read the comment docs.

@lokeshlal
Copy link
Contributor Author

lokeshlal commented Dec 31, 2019

@tooptoop4 Yes the approach looks good when multiple pools are required as described in the jira ticket.
This PR will be useful in a scenario, where we have a spark cluster where jobs needs to be submitted with different complexity (such as Large jobs, medium jobs etc) and each job would require different capacity on spark cluster. Hence dynamic pooling can help control the spark cluster capacity directly from the Airflow using pools. this is aligned to the following jira ticket https://issues.apache.org/jira/browse/AIRFLOW-1467

The problem statement mentioned in the jira ticket https://issues.apache.org/jira/browse/AIRFLOW-6227, can be handled via locking a file for write. That is, if the ask is to keep one writer on a table, then before triggering spark job, create another task that will put a file write lock on a file (name same as table name) in the file system (libraries such as fasteners or lockfile in a python operator can be used). This will make sure that at a time only one job will be triggered for the said table and makes the code more dynamic rather than creating pools every time a new table is introduced. and once the spark job finishes (weather the job fail or pass) then release the lock from the file.

@dimberman dimberman self-requested a review December 31, 2019 18:38
@dimberman
Copy link
Contributor

@lokeshlal +1 this is a great idea!

Could you please make the unit tests a bit more comprehensive? (e.g. multiple pools)

@lokeshlal
Copy link
Contributor Author

@dimberman Sorry for the confusion I created with my previous comment. I have not implemented multiple pools... only implemented a way to use more than one pool slot in a task. specifically trying to resolve https://issues.apache.org/jira/browse/AIRFLOW-1467

for example, provide number of pool slots a task will be using using pool_capacity property

task = BashOperator(
    task_id='sleep',
    bash_command='sleep 300',
    retries=3,
    pool='test',
    pool_capacity=6,	
    dag=dag)

@lokeshlal lokeshlal changed the title Dynamic pooling via allowing tasks to use more than one pool slot (depending upon the need) [AIRFLOW-1467] Dynamic pooling via allowing tasks to use more than one pool slot (depending upon the need) Jan 2, 2020
@lokeshlal
Copy link
Contributor Author

@dimberman I have updated pools test cases for task instances. Could you please review the code again. Thank you.

@dimberman
Copy link
Contributor

@lokeshlal why do all of the test cases have priority_weight of 1? Could you do a test of weight=2 and of weight=0?

@lokeshlal
Copy link
Contributor Author

@dimberman Yes I have modified test cases to include pool_capacity to be 0, 1 and 2. I have again modified to include the pool_capacity in other test cases as well. the build is under the process. hopefully I have not missed the any other test case.

@dimberman
Copy link
Contributor

Thank you @lokeshlal. I'll merge this once tests pass.

@lokeshlal
Copy link
Contributor Author

Thank you @dimberman.

@lokeshlal
Copy link
Contributor Author

Hi @dimberman, I have done changes in few more test cases to pass pool_capacity property in the Mock object. Test cases also executed successfully. Could you please review the latest commit as well (commit after your approval).

@lokeshlal
Copy link
Contributor Author

Hi @dimberman, Could you please review the remaining changes as well and if everything looks good, please merge the changes as well. thanks.

@dimberman
Copy link
Contributor

@lokeshlal LGTM thanks for setting this up!

@dimberman dimberman merged commit 277d01d into apache:master Jan 13, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Jan 13, 2020

Awesome work, congrats on your first merged pull request!

potiuk added a commit that referenced this pull request Jan 13, 2020
… than one pool slot (depending upon the need) (#6975)"

This reverts commit 277d01d.
@potiuk
Copy link
Member

potiuk commented Jan 13, 2020

Hello @lokeshlal @dimberman - I had to revert this change from master as it created two heads in sqlalchemy migrations and we have test to detect that. The root cause was that this PR was not rebased after the second head was added (otherwise it would have been detected by failing Travis CI run - last time Travis was run 6 days ago for this PR and the other head (#6489 ) was merged 3 days ago.

It's no-one fault, it can happen in our process, but we will have to add better protection against such problems in the future. In the meantime can you please create another PR with this change, fix the duplicated heads (by making your migration based on the new head migration) and re-submit/merge the PR again.

Sorry for the inconvenience @lokeshlal - I understand it was your first PR merged and you hit this - very rare - problem :(. We will do better in the future to avoid it.

@potiuk
Copy link
Member

potiuk commented Jan 13, 2020

@potiuk
Copy link
Member

potiuk commented Jan 13, 2020

I prefer the "hacky" way.

@lokeshlal
Copy link
Contributor Author

Thank you @dimberman and @potiuk - I have created another PR with a single migration head. #7160
It is in travis buid. hopefully test will pass.

@potiuk
Copy link
Member

potiuk commented Jan 14, 2020

We still have "kerberos" related instability. I restarted the build and hopefully it will pass.

@potiuk
Copy link
Member

potiuk commented Jan 14, 2020

(working to fix the instability as well)

galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this pull request Mar 5, 2020
…e pool slot (depending upon the need) (apache#6975)

* adding pool capacity required for each task for dynamic pooling

* Added pool_capacity column migration script

* removed test checkedin file

* removed extra space

* correct test_database_schema_and_sqlalchemy_model_are_in_sync test case

* Added description for pool_capacity property for task instance

* Modified test cases to include pool_capacity along with pool in task instances

* Modified test cases to include pool_capacity along with pool in task instances

* Removed Column.name property, since property value is same as actual variable

* check for pool_capacity property to be always >= 1

* removed unused variable ti

* modified test cases for pool_capacity

* modified test cases for pool_capacity
galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this pull request Mar 5, 2020
… than one pool slot (depending upon the need) (apache#6975)"

This reverts commit 277d01d.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants