Skip to content

Conversation

@eskarimov
Copy link
Contributor

@eskarimov eskarimov commented Nov 21, 2021

closes: #18999

The PR intends to add deferrable versions of DatabricksSubmitRunOperator and DatabricksRunNowOperator, which are using new Airflow functionality introduced with the version 2.2.0.

There're not so many examples of deferrable operators at the moment, so I'd appreciate if we discuss the following:

  • Should we update existing operators or create new?
    If we update existing operators right away, we'll break backward compatibility for Airflow versions prior 2.2.0, so I thought it'd be better to introduce new operators at the moment.
  • Execution timeout for deferrable operators not handled correctly at the moment, will investigate separately under Deferrable Operators don't respect execution_timeout after being deferred #19382

@eskarimov
Copy link
Contributor Author

@chinwobble, kindly requesting to review :)

super().__init__(*args, **kwargs)

async def __aenter__(self):
self._session = aiohttp.ClientSession()
Copy link
Contributor

@chinwobble chinwobble Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really good stuff.

Currently you are using one Hook per trigger. That means if you had 32 concurrent triggers, then they would all have their own client sessions.
I've had a look at the docs for aiohttp, it says:

it is suggested you use a single session for the lifetime of your application to benefit from connection pooling.

https://docs.aiohttp.org/en/stable/client_reference.html

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that hooks are not necessarily run in the same process, so if you want to share sessions among them, you must move the abstraction to the trigger instead.

I wonder how viable it’d be to refactor the sychronous version (DatabricksHook) into a sans I/O base class that can be used by DatabricksExecutionTrigger, instead of implementing a new, entirely separate hook for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally each Triggerer process would share one session. A DAG can trigger multiple DatabricksExecutionTrigger at the same time. Could these triggers all share the same session?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They could, yes, because async triggers are designed to run in the same process, that’s way they are async. If they run in their own process, there’s no point to use asyncio in the first place. But the difficult is who should manage that shared session. I wonder if @andrewgodwin actually though about this preemptively 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you both so much, that's very good points.

Trying to summarise:

  • Ideally all tasks executed by each trigerrer should share the same session - however, not sure if it'd be possible without touching the core Airflow Triggerer functionality.

  • Note that hooks are not necessarily run in the same process, so if you want to share sessions among them, you must move the abstraction to the trigger instead.

    Does it mean to move ClientSession initialisation to the trigger, i.e. as a property of DatabricksExecutionTrigger?
    My understanding is that when there're multiple triggerer processes, each trigger will be executed as a separate async task with its own ClientSession instance. So even if we move it to DatabricksExecutionTrigger, it'd still create a session for each trigger run.

  • Refactoring Databricks hook - I agree it'd be the perfect solution to keep everything inside a single class. Also, after the comment by @alexott it's even more motivation to do that to prevent making double work in the future. I'll try to refactor it, to minimize repeatable code and isolating IO operations

Copy link
Member

@uranusjr uranusjr Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you must move the abstraction to the trigger instead.

My understanding is that when there're multiple triggerer processes, each trigger will be executed as a separate async task with its own ClientSession instance. So even if we move it to DatabricksExecutionTrigger, it'd still create a session for each trigger run.

Sorry, I think I meant to say triggerer instead of trigger 🙂

Your general summary is correct though; I don’t think it’s particularly worthwhile to make that optimisation right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is no easy mechanism for pooling connections that are all running under a single triggerer right now, but you can also auto-pool from an async hook implementation by detecting same-thread different-coroutine if you need to. Just makes it quite a bit more complex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewgodwin Another issue I encountered with deferrable operators is that task duration during deferral is not counted.
When I run this operator, it says all my DAG's task finish in 1 sec.

super().__init__(*args, **kwargs)

async def __aenter__(self):
self._session = aiohttp.ClientSession()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that hooks are not necessarily run in the same process, so if you want to share sessions among them, you must move the abstraction to the trigger instead.

I wonder how viable it’d be to refactor the sychronous version (DatabricksHook) into a sans I/O base class that can be used by DatabricksExecutionTrigger, instead of implementing a new, entirely separate hook for it.

Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment about auth options

raise AirflowException(error_message)


class DatabricksRunNowOperator(BaseOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe slightly outside the scope of the particular pull request.
It would be nice if implement on the on_kill method too.
When a process is cancelled or marked as failed, it should also cancel the running databricks job run.
To do this, you will need to push the run_id to xcom and retrieve it in the on_kill method. I'm not sure if this is possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just tried to play with the deferrable operator's behaviour when a task is killed, and it might be there's a bug with it.
If a task was killed while being executed by Triggerer, there's no log available for this task after kill. Later on, if the same task is started again, it finishes immediately, like it was continued after being deferred. Will raise an issue with detailed description how to reproduce it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I can see how that might leave things in a strange state. We may need to add code to do better cleanup when a task is killed-while-deferred.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewgodwin @alexott
Any thoughts on how this should be handled?

"""
method, endpoint = endpoint_info

self.databricks_conn = self.get_connection(self.databricks_conn_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watch out - this is a blocking call, as it does a database fetch/secrets backend connection behind the scenes. You need to wrap it in something to make it async-compatible.

Copy link
Member

@potiuk potiuk Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe there is a way to at least make "airflows" blocking methods somewhat flagged when this happens ? Or find a way to forbid them when we are in async context @andrewgodwin ?

If you recall, this is the very case I was afraid it's too easy to "slip through" when people start making more use
of the deffered operators and use it in their custom ones which do not pass through watchful eyes of yours and other committers of Airflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. We could always lift what we did with Django for this very problem: https://github.com/django/django/blob/37d9ea5d5c010d54a416417399344c39f4e9f93e/django/utils/asyncio.py#L8

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ it. I think there are a number of "likely to use" methods in Airflow that we could mark as async_unsafe with decorators, anticipating they will cause troubles.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe we could revert the logic :) and assume by default, any airflow call is unsafe (unless decorated with @async_safe).

Copy link
Contributor

@andrewgodwin andrewgodwin Nov 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with turning async safety onto everything by default is that you need something like the decorator to be in the middle of the call to do the check - not sure we can reasonably override every Airflow API function without some seriously dark magic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Basically you'd have to intercept every method call - while possible, it woudl be terrible performance penalty

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 97e0110 to 44ef54b Compare December 14, 2021 10:59
@eskarimov
Copy link
Contributor Author

Updated and rebased the PR, implementing new auth methods for Azure AD, and making improvements based on the comments in the PR.

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 44ef54b to d327753 Compare December 15, 2021 14:17
@eskarimov eskarimov requested review from potiuk and uranusjr December 20, 2021 16:07
@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 1870f68 to 7893d94 Compare December 28, 2021 14:09
@potiuk
Copy link
Member

potiuk commented Dec 28, 2021

This operator will not be clearly importing on Airflow 2.1 (see the errors in tests). Airlfow.trigger is not available in Airlfow 2.1.

There are two approaches we can take:

  1. make the Databricks provider Airlfow 2.2+
  2. Try/catch import errors and provide a warning that the Deferrable Operators will only work for Airlfow 2.2+

I am for the 2nd solution as there are quite a few non-defferable operators there that shoudl continue to work. But I would love to hear what others think.

@eskarimov
Copy link
Contributor Author

This operator will not be clearly importing on Airflow 2.1 (see the errors in tests). Airlfow.trigger is not available in Airlfow 2.1.

There are two approaches we can take:

  1. make the Databricks provider Airlfow 2.2+
  2. Try/catch import errors and provide a warning that the Deferrable Operators will only work for Airlfow 2.2+

I am for the 2nd solution as there are quite a few non-defferable operators there that shoudl continue to work. But I would love to hear what others think.

Thank you for this point, true, the deferrable operators functionality is only available starting 2.2 version.

The second approach with try/catch import errors sounds right to me.

I'd be glad to work on it, if others agree upon this approach.

@potiuk
Copy link
Member

potiuk commented Dec 29, 2021

Thank you for this point, true, the deferrable operators functionality is only available starting 2.2 version.

Thank our CI :) . It simply caught this !

@eskarimov
Copy link
Contributor Author

Thinking loud about the second approach with trying and catching exceptions:
when we catch ImportError on importing non-existing classes airflow.triggers.base.BaseTrigger and airflow.triggers.base.TriggerEvent how it'd be better to handle the exception?

DatabricksExecutionTrigger is inherited from BaseTrigger. So if we just log a warning on importing Airflow 2.2+ classes, the CI check will still fail, because BaseTrigger and TriggerEvent won't be defined.

I was thinking about using the abstract class as a substitute of non-existing classes if ImportError was raised, but not sure, if it'd be the right approach.

@potiuk
Copy link
Member

potiuk commented Dec 31, 2021

DatabricksExecutionTrigger is inherited from BaseTrigger. So if we just log a warning on importing Airflow 2.2+ classes, the CI check will still fail, because BaseTrigger and TriggerEvent won't be defined.

Not really. We can - in this case - set BaseTrigger, TriggerEvent to None add the warning to "known warnings". As long as those classes are not actually 'run' during parsing the class (they shoud not be). It will work just fine.

We've done that in other places:

try:
    from kubernetes.client import models as k8s
except ImportError:
    log.warning(
        "The example_kubernetes_executor example DAG requires the kubernetes provider."
        " Please install it with: pip install apache-airflow[cncf.kubernetes]"
    )
    k8s = None

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 7893d94 to d3bdf79 Compare January 4, 2022 09:32
@eskarimov
Copy link
Contributor Author

I've rebased the PR and resolved the conflicts, adapting operators/hooks to the recent changes and added a warning in case of using Deferrable Operators on Airflow prior to 2.2 version.

@eskarimov
Copy link
Contributor Author

Sorry for bothering, just wanted to check if I could do anything here to improve the PR?

Trying to summarise the points discussed:

  • Blocking call to fetch secrets/connection details with self.get_connection() - the current implementation wraps databricks_conn() into cached_property decorator, called during DatabricksHook initialization, not sure if we should change anything here now.
  • Provide a warning if deferrable operator being used in Airflow prior to 2.2. An alternative option is to mark the provider as Airflow 2.2+ compatible, any input here is very welcome.

I've also changed the initial approach for creating async hook methods - firstly I thought about creating class DatabricksHookAsync as a sub-class of DatabricksHook, to avoid duplicating code and having a clear separation between sync and async hook, but then I thought it'd be wrong to make it a sub-class, as we can't just replace usages of DatabricksHook with DatabricksHookAsync.
Creating and supporting an independent DatabricksHookAsync class seems non-optimal as well.
So I ended up with creating async versions of regular functions inside DatabricksHook. Please let me know if there's any better way to do that.

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from d3bdf79 to d0abe31 Compare February 18, 2022 08:49
@eskarimov
Copy link
Contributor Author

Resolved the conflicts and rebased the PR.

@potiuk
Copy link
Member

potiuk commented Feb 26, 2022

Tests are still failing

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from d0abe31 to 1d87620 Compare March 7, 2022 21:31
@potiuk
Copy link
Member

potiuk commented Mar 13, 2022

Some changes are merged so the conflicts need to be resolved.

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 1d87620 to 3856470 Compare March 15, 2022 22:08
@eskarimov
Copy link
Contributor Author

The PR is rebased, together with resolving conflicts.

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 3856470 to 76ab794 Compare March 30, 2022 05:41
@eskarimov
Copy link
Contributor Author

Resolved conflicts, rebased PR.

@potiuk
Copy link
Member

potiuk commented Mar 31, 2022

I do not know that much about deferrable operators yet to commetn, but at least from the quick look it looks good @andrewgodwin ?

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 76ab794 to 3472679 Compare April 11, 2022 07:36
@eskarimov
Copy link
Contributor Author

There were some conflicts, resolved them and rebased.

@eskarimov eskarimov force-pushed the 18999-add-databricks-deferrable-operators branch from 3472679 to 9b40c02 Compare May 3, 2022 20:08
@eskarimov
Copy link
Contributor Author

Rebased and resolved conflicts.
How could I help to get it reviewed?
I've also seen Astoronomer's repo with plenty of deferrable operators including Dataricks, curious if it'd be contributed to Airflow one day.

@potiuk
Copy link
Member

potiuk commented May 8, 2022

@alexott - any more comments ?

@alexott
Copy link
Contributor

alexott commented May 11, 2022

sorry for delay, will have time only over weekend to review

Copy link
Contributor

@alexott alexott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, I'm only a bit worried about code duplication in do_api_call and async version of it. Maybe need to do some refactoring later...

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label May 22, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@potiuk
Copy link
Member

potiuk commented May 22, 2022

LGTM. We can remove duplication later I think :)

@potiuk potiuk merged commit 6150d28 into apache:main May 22, 2022
@pierrejeambrun
Copy link
Member

@potiuk I think this introduced a small issue. (old variable supposed to be remove). Tests are failing.

PR for a proposed fix -> #23856

@eskarimov
Copy link
Contributor Author

Thank you everyone for your reviews, ideas and activity! :)

@eskarimov eskarimov deleted the 18999-add-databricks-deferrable-operators branch May 23, 2022 07:10
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label May 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Update databricks provider to use TriggerOperator

9 participants