Skip to content

Conversation

@dstandish
Copy link
Contributor

@dstandish dstandish commented Dec 29, 2021

Historically sensors do not support returning XCom unless you override the execute method.
With this change we can optionally return an XCom value by returning one along with True
in new class PokeReturnValue

A key motivator for this PR is this other PR to add @task.sensor decorator. With the present PR merged, we can make it so the sensor decorator can return a tuple True, {"some": "info"} to push xcom.

@dstandish dstandish marked this pull request as draft December 29, 2021 04:39
@dstandish dstandish mentioned this pull request Dec 29, 2021
@potiuk
Copy link
Member

potiuk commented Dec 29, 2021

Duplicate of #20546 ?

@potiuk
Copy link
Member

potiuk commented Dec 29, 2021

Closing as duplicate

@potiuk potiuk closed this Dec 29, 2021
@potiuk potiuk added the duplicate Issue that is duplicated label Dec 29, 2021
@dstandish
Copy link
Contributor Author

it's an alternative approach @potiuk that, in contrast with #20546, is fully backward compatible...

@potiuk
Copy link
Member

potiuk commented Dec 29, 2021

Ah sorry. let me take a look.

@potiuk
Copy link
Member

potiuk commented Dec 29, 2021

It's still not really backwards compatible (in the sense a having a New Sensor returnig PokeReturnValue run on airlfow pre 2.3).

If you return PokeReturnValue() in Airlfow 2.2. it will not work. The BaseSensorOperator from Airflow 2.2 will run this:

while not self.poke():

And it will always finish after first check no matter what "is_done" is. In this case self.poke() returns PokeReturnValue() class which is Truthy no matter what is set in is_done).

But It could be made backwards compatible - and I like this much more than my "return Truthy" approach.

Should be enough to do this:

class PokeReturnValue:
    """
    Sensors can optionally return this an instance of this class in the poke method.
    If an xcom value is supplied when sensor is done, then the XCom value will be
    push through the operator return value.
    """

    def __init__(self, is_done, xcom_value=None):
        self.xcom_value = xcom_value
        self.is_done = is_done


    def __bool__(self):
       return self.is_done

I think this is the BEST approach.

UPDATE: Nope. It won't work, because in Airflow 2.2 there wil be no PokeReturnValue class.

@mingshi-wang
Copy link
Contributor

It's still not really backwards compatible (in the sense a having a New Sensor returnig PokeReturnValue run on airlfow pre 2.3).

If you return PokeReturnValue() in Airlfow 2.2. it will not work. The BaseSensorOperator from Airflow 2.2 will run this:

while not self.poke():

And it will always finish after first check no matter what "is_done" is. In this case self.poke() returns PokeReturnValue() class which is Truthy no matter what is set in is_done).

But It could be made backwards compatible - and I like this much more than my "return Truthy" approach.

Should be enough to do this:

class PokeReturnValue:
    """
    Sensors can optionally return this an instance of this class in the poke method.
    If an xcom value is supplied when sensor is done, then the XCom value will be
    push through the operator return value.
    """

    def __init__(self, is_done, xcom_value=None):
        self.xcom_value = xcom_value
        self.is_done = is_done


    def __bool__(self):
       return self.is_done

I think this is the BEST approach.

UPDATE: Nope. It won't work, because in Airflow 2.2 there wil be no PokeReturnValue class.

It's still not really backwards compatible (in the sense a having a New Sensor returnig PokeReturnValue run on airlfow pre 2.3).

If you return PokeReturnValue() in Airlfow 2.2. it will not work. The BaseSensorOperator from Airflow 2.2 will run this:

while not self.poke():

And it will always finish after first check no matter what "is_done" is. In this case self.poke() returns PokeReturnValue() class which is Truthy no matter what is set in is_done).

But It could be made backwards compatible - and I like this much more than my "return Truthy" approach.

Should be enough to do this:

class PokeReturnValue:
    """
    Sensors can optionally return this an instance of this class in the poke method.
    If an xcom value is supplied when sensor is done, then the XCom value will be
    push through the operator return value.
    """

    def __init__(self, is_done, xcom_value=None):
        self.xcom_value = xcom_value
        self.is_done = is_done


    def __bool__(self):
       return self.is_done

I think this is the BEST approach.

UPDATE: Nope. It won't work, because in Airflow 2.2 there wil be no PokeReturnValue class.

A not-so-ideal way is to declare a dynamic return type:
def poke(self, context: Context) -> Union[bool, 'PokeReturnValue']:
...

@potiuk
Copy link
Member

potiuk commented Dec 29, 2021

A not-so-ideal way is to declare a dynamic return type:
def poke(self, context: Context) -> Union[bool, 'PokeReturnValue']:

That's not enough. The problem is that any sensor that uses PokeReturnValue would have to be Airlfow 2.3+ only (PokeReturnValue will fail to be imported for such sensor)

@mingshi-wang
Copy link
Contributor

A not-so-ideal way is to declare a dynamic return type:
def poke(self, context: Context) -> Union[bool, 'PokeReturnValue']:

That's not enough. The problem is that any sensor that uses PokeReturnValue would have to be Airlfow 2.3+ only (PokeReturnValue will fail to be imported for such sensor)

You are right - what I can think of is to introduce a new type of sensor, e.x RichSensor and make the poke() method return a PokeReturnValue type.

Historically sensors do not support returning XCom unless you override the `execute` method.
With this change we can optionally return an XCom value by returning one along with `True`
in new class PokeReturnValue
@dstandish dstandish force-pushed the allow-sensor-to-return-xcom-2 branch from c352995 to 19a9d2a Compare January 28, 2022 20:48
@dstandish
Copy link
Contributor Author

this pr taken over by @mingshi-wang in #20656

@dstandish dstandish closed this Mar 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

duplicate Issue that is duplicated

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants