Skip to content

Conversation

@tatiana
Copy link
Contributor

@tatiana tatiana commented Jul 15, 2020

Closes #8703. Any feedback is very welcome!

There are some standing questions:

  1. What are your thoughts on isolating the JSON serialization logic in a separate "lightweight" module? My initial attempt was to use the existing serialized_objects.py module, without duplicating any code, following the suggestion from Support for set in XCom serialization #8703 (comment):
# Module: xcom.py

from airflow.serialization.serialized_objects import BaseSerialization
(...)
# within the function `serialize_value`
            dict_ = BaseSerialization.serialize_to_json(value)
            return json.dumps(dict_).encode('UTF-8')
(...)
# within the function `deserialize_value`
            dict_ = json.loads(result.value.decode('UTF-8'))
            return BaseSerialization.from_dict(dict_)

However, this seems to have resulted in a circular dependency, which could be observed when running pytest tests/models/test_xcom.py:

______________________ ERROR collecting tests/models/test_xcom.py ______________________
/usr/local/lib/python3.6/logging/config.py:390: in resolve
    found = getattr(found, frag)
E   AttributeError: module 'airflow.utils.log' has no attribute 'file_task_handler'

During handling of the above exception, another exception occurred:
/usr/local/lib/python3.6/logging/config.py:392: in resolve
    self.importer(used)
airflow/utils/log/file_task_handler.py:26: in <module>
    from airflow.models import TaskInstance
airflow/models/__init__.py:20: in <module>
    from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
airflow/models/baseoperator.py:45: in <module>
    from airflow.models.taskinstance import TaskInstance, clear_task_instances
airflow/models/taskinstance.py:50: in <module>
    from airflow.models.xcom import XCOM_RETURN_KEY, XCom
airflow/models/xcom.py:37: in <module>
    from airflow.serialization.serialized_objects import BaseSerialization
airflow/serialization/serialized_objects.py:31: in <module>
    from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
E   ImportError: cannot import name 'BaseOperator'

The above exception was the direct cause of the following exception:
/usr/local/lib/python3.6/logging/config.py:565: in configure
    handler = self.configure_handler(handlers[name])
/usr/local/lib/python3.6/logging/config.py:715: in configure_handler
    klass = self.resolve(cname)
/usr/local/lib/python3.6/logging/config.py:399: in resolve
    raise v
/usr/local/lib/python3.6/logging/config.py:392: in resolve
    self.importer(used)
airflow/utils/log/file_task_handler.py:26: in <module>
    from airflow.models import TaskInstance
airflow/models/__init__.py:20: in <module>
    from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
airflow/models/baseoperator.py:45: in <module>
    from airflow.models.taskinstance import TaskInstance, clear_task_instances
airflow/models/taskinstance.py:50: in <module>
    from airflow.models.xcom import XCOM_RETURN_KEY, XCom
airflow/models/xcom.py:37: in <module>
    from airflow.serialization.serialized_objects import BaseSerialization
airflow/serialization/serialized_objects.py:31: in <module>
    from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
E   ValueError: Cannot resolve 'airflow.utils.log.file_task_handler.FileTaskHandler': cannot import name 'BaseOperator'

During handling of the above exception, another exception occurred:
/usr/local/lib/python3.6/site-packages/py/_path/local.py:704: in pyimport
    __import__(modname)
tests/models/__init__.py:21: in <module>
    from airflow.utils import timezone
airflow/__init__.py:41: in <module>
    settings.initialize()
airflow/settings.py:326: in initialize
    LOGGING_CLASS_PATH = configure_logging()
airflow/logging_config.py:69: in configure_logging
    raise e
airflow/logging_config.py:64: in configure_logging
    dictConfig(logging_config)
/usr/local/lib/python3.6/logging/config.py:802: in dictConfig
    dictConfigClass(config).configure()
/usr/local/lib/python3.6/logging/config.py:573: in configure
    '%r: %s' % (name, e))
E   ValueError: Unable to configure handler 'task': Cannot resolve 'airflow.utils.log.file_task_handler.FileTaskHandler': cannot import name 'BaseOperator'
  1. Assuming there is an agreement with (1), I can refactor the class serialization/serialized_objects.py::BaseSerialization to use the module so we avoid duplicating the logic.

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides a context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in the description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In the case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In the case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards-incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg
Copy link

boring-cyborg bot commented Jul 15, 2020

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://apache-airflow-slack.herokuapp.com/

@tatiana tatiana marked this pull request as ready for review July 16, 2020 01:34
@tatiana
Copy link
Contributor Author

tatiana commented Jul 16, 2020

Hey, @turbaszek @potiuk thank you very much for the Workshop yesterday. It was great!
If you have any suggestions, it would be great! It took me a bit longer than the original 45min estimate to have this first draft ;)

@potiuk
Copy link
Member

potiuk commented Jul 16, 2020

@tatiana There is a test failing which is likely related to XCom Serialization change..

@tatiana
Copy link
Contributor Author

tatiana commented Jul 16, 2020

Thanks, @potiuk ! I'll solve those later today, sorry - I'll make sure the CI is happy.
Do you have any comments on the implementation itself? Anything you'd like me to change with the strategy?

@turbaszek turbaszek requested review from kaxil and turbaszek July 16, 2020 13:02
@potiuk
Copy link
Member

potiuk commented Jul 16, 2020

I did not yet have time to take a deeper look - and a little busy today :(. I will take a look tomorrow I think! Nothing to be sorry about :). Tests are there to catch all those things that are not obvious :)

@turbaszek
Copy link
Member

Hm, let me take a look at the failing lineage tests...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should support None as possible return value for XCom

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @turbaszek, for pointing this out - I just added a test & support for None.

@turbaszek
Copy link
Member

What are your thoughts on isolating the JSON serialization logic in a separate "lightweight" module?

I think it's a good idea. @kaxil you are our serialization expert, what do you think?

tatiana added 4 commits July 21, 2020 00:49
Based on airflow/serialization/serialized_objects.py:BaseSerialization
The purpose was to create a lightweight module which could be used
by XCom in order to serialise sets and other structures which contain
nested sets.

Relates to issue: #8703
when using JSON serialization.

Resolves: #8703
@tatiana
Copy link
Contributor Author

tatiana commented Jul 21, 2020

Hm, let me take a look at the failing lineage tests...

Thanks, @turbaszek, I'm struggling to reproduce the issues from the CI locally. I tried to run ./breeze --python 3.6 --backend mysql but had issues ("mysql db could not be reached!"). I'll re-watch the breeze video and make sure I'm not missing any step.

Do you have any ideas on why the quarantine tests might be currently be killed? Usually, the exit code 137 relates to high memory consumption, but it isn't clear to me why the changes I introduced had this side effect 🤔 Do you have any advice?

  • Quarantined:Pg9.6,Py3.6
tests/jobs/test_backfill_job.py::TestBackfillJob::test_trigger_controller_dag 
##[error]Process completed with exit code 137.

Previously there were some view Core tests failing, but it seems they are now passing. Perhaps they were related to the NoneType support.

@turbaszek
Copy link
Member

Usually, the exit code 137 relates to high memory consumption

That's usually the problem. The quarantined tests often use multiprocessing and mixing this with flaky tests results in unstable builds. I usually try to re-run quarantined tests locally if I have suspicion that my change may impact those tests.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am taking a look at this now. Preferably I would use the serialized_objects module and avoid the duplication.

Let me test something and will post my findings soon.

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you guys think about the following patch:

curl https://pastebin.com/raw/p6Dq8aiK | git am

@turbaszek
Copy link
Member

@kaxil I'm ok with a proposed approach as long as pylint is happy. General speaking we will introduce cyclic dependency but a working one by adding

from airflow.serialization.serialized_objects import BaseSerialization

in Xcom serialize and deserialize methods.

@tatiana
Copy link
Contributor Author

tatiana commented Jul 21, 2020

Thank you both for taking the time to review and give feedback.

@kaxil as mentioned by @turbaszek, it would be great if we could avoid introducing this cyclic dependency.

I fully agree with avoiding duplicating the code as well.

How would you both feel if I try to extract the BaseSerialization - keeping its current interfaces - to another module, which would be used both by xcom.py and serialized_objects.py? If you like this possibility, how would you like name this file which would contain the BaseSerialization class?

@kaxil
Copy link
Member

kaxil commented Jul 21, 2020

@kaxil I'm ok with a proposed approach as long as pylint is happy. General speaking we will introduce cyclic dependency but a working one by adding

from airflow.serialization.serialized_objects import BaseSerialization

in Xcom serialize and deserialize methods.

We previously had base_serialization, serialized_operator and serialized_dag in 3 separate files but that suffered from some cyclic dependencies. We logically grouped them to avoid it but since serialization touches almost most of the models it gets tricky.

Check #6718 where that was fixed. If just moving BaseSerialization to a separate module helps fix the cyclic dependencies I would be happy but please do check that removes 1 cyclic dependency doesn't introduce other cyclic dependency

@turbaszek
Copy link
Member

@tatiana let's check if Kaxil proposition is accepted by pylint. If yes let's use it to solve this issue. After that, we can think about extracting this logic.

@stale
Copy link

stale bot commented Sep 5, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 5, 2020
@kaxil
Copy link
Member

kaxil commented Sep 8, 2020

any updates?

@stale stale bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 8, 2020
@kaxil kaxil added the pinned Protect from Stalebot auto closing label Sep 8, 2020
@kaxil
Copy link
Member

kaxil commented Oct 30, 2020

It's been a while, any updates on this one @tatiana ?

@kaxil kaxil added this to the Airflow 2.0.0 milestone Oct 30, 2020
@potiuk
Copy link
Member

potiuk commented Dec 7, 2020

@tatiana - we would love to merge it before 2.0.0rc1 this week. Would it be possible you rebase it and add the last fix from @kaxil ?

@kaxil kaxil removed the pinned Protect from Stalebot auto closing label Dec 7, 2020
@ashb ashb modified the milestones: Airflow 2.0.0rc1, Airflow 2.1 Dec 8, 2020
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 17, 2021
@github-actions github-actions bot closed this Mar 22, 2021
@kaxil kaxil removed this from the Airflow 2.1 milestone May 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:serialization stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for set in XCom serialization

5 participants