diff --git a/airflow/__init__.py b/airflow/__init__.py index bc6a7bbe19f36..d010fe4c74840 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -32,8 +32,8 @@ import sys -from airflow import configuration as conf -from airflow import settings +# flake8: noqa: F401 +from airflow import settings, configuration as conf from airflow.models import DAG from flask_admin import BaseView from importlib import import_module diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py index 641b81e46da63..08fa0d7929d2c 100644 --- a/airflow/contrib/auth/backends/github_enterprise_auth.py +++ b/airflow/contrib/auth/backends/github_enterprise_auth.py @@ -19,18 +19,14 @@ import flask_login # Need to expose these downstream -# pylint: disable=unused-import -from flask_login import (current_user, - logout_user, - login_required, - login_user) -# pylint: enable=unused-import +# flake8: noqa: F401 +from flask_login import current_user, logout_user, login_required, login_user from flask import url_for, redirect, request from flask_oauthlib.client import OAuth -from airflow import models, configuration, settings +from airflow import models, configuration from airflow.configuration import AirflowConfigException from airflow.utils.db import provide_session from airflow.utils.log.logging_mixin import LoggingMixin diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py index e41934b926c31..08b29e383a76a 100644 --- a/airflow/contrib/auth/backends/google_auth.py +++ b/airflow/contrib/auth/backends/google_auth.py @@ -19,18 +19,14 @@ import flask_login # Need to expose these downstream -# pylint: disable=unused-import -from flask_login import (current_user, - logout_user, - login_required, - login_user) -# pylint: enable=unused-import +# flake8: noqa: F401 +from flask_login import current_user, logout_user, login_required, login_user from flask import url_for, redirect, request from flask_oauthlib.client import OAuth -from airflow import models, configuration, settings +from airflow import models, configuration from airflow.utils.db import provide_session from airflow.utils.log.logging_mixin import LoggingMixin diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py index fdb6204967dd4..4a019eb131f56 100644 --- a/airflow/contrib/auth/backends/kerberos_auth.py +++ b/airflow/contrib/auth/backends/kerberos_auth.py @@ -21,8 +21,7 @@ import flask_login from flask_login import current_user from flask import flash -from wtforms import ( - Form, PasswordField, StringField) +from wtforms import Form, PasswordField, StringField from wtforms.validators import InputRequired # pykerberos should be used as it verifies the KDC, the "kerberos" module does not do so @@ -32,7 +31,6 @@ from flask import url_for, redirect -from airflow import settings from airflow import models from airflow import configuration from airflow.utils.db import provide_session diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py index 516e121c9b26d..a949e89a77b9e 100644 --- a/airflow/contrib/auth/backends/ldap_auth.py +++ b/airflow/contrib/auth/backends/ldap_auth.py @@ -19,13 +19,12 @@ from future.utils import native import flask_login -from flask_login import login_required, current_user, logout_user +from flask_login import login_required, current_user, logout_user # noqa: F401 from flask import flash -from wtforms import ( - Form, PasswordField, StringField) +from wtforms import Form, PasswordField, StringField from wtforms.validators import InputRequired -from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE, BASE +from ldap3 import Server, Connection, Tls, LEVEL, SUBTREE import ssl from flask import url_for, redirect diff --git a/airflow/contrib/operators/mlengine_prediction_summary.py b/airflow/contrib/operators/mlengine_prediction_summary.py index 5dac0a44a9dcb..def793c1be001 100644 --- a/airflow/contrib/operators/mlengine_prediction_summary.py +++ b/airflow/contrib/operators/mlengine_prediction_summary.py @@ -1,3 +1,4 @@ +# flake8: noqa: F841 # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with diff --git a/airflow/default_login.py b/airflow/default_login.py index d44dbf39ea9b3..bf87bbc47fa76 100644 --- a/airflow/default_login.py +++ b/airflow/default_login.py @@ -25,11 +25,11 @@ """ import flask_login -from flask_login import login_required, current_user, logout_user +from flask_login import login_required, current_user, logout_user # noqa: F401 from flask import url_for, redirect -from airflow import settings +from airflow import settings # noqa: F401 from airflow import models from airflow.utils.db import provide_session @@ -64,9 +64,6 @@ def is_superuser(self): """Access all the things""" return True -# models.User = User # hack! -# del User - @login_manager.user_loader @provide_session diff --git a/airflow/jobs.py b/airflow/jobs.py index d51e7c25372d5..5bcd3e84070c0 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -658,7 +658,7 @@ def manage_slas(self, dag, session=None): slas = ( session .query(SlaMiss) - .filter(SlaMiss.notification_sent == False) + .filter(SlaMiss.notification_sent == False) # noqa: E712 .filter(SlaMiss.dag_id == dag.dag_id) .all() ) @@ -708,16 +708,13 @@ def manage_slas(self, dag, session=None): Blocking tasks:
{blocking_task_list}\n{bug}
""".format(bug=asciiart.bug, **locals()) - emails = [] - for t in dag.tasks: - if t.email: - if isinstance(t.email, basestring): - l = [t.email] - elif isinstance(t.email, (list, tuple)): - l = t.email - for email in l: - if email not in emails: - emails.append(email) + emails = set() + for task in dag.tasks: + if task.email: + if isinstance(task.email, basestring): + emails.add(task.email) + elif isinstance(task.email, (list, tuple)): + emails |= set(task.email) if emails and len(slas): try: send_email( @@ -818,7 +815,7 @@ def create_dag_run(self, dag, session=None): session.query(func.max(DagRun.execution_date)) .filter_by(dag_id=dag.dag_id) .filter(or_( - DagRun.external_trigger == False, + DagRun.external_trigger == False, # noqa: E712 # add % as a wildcard for the like query DagRun.run_id.like(DagRun.ID_PREFIX + '%') )) @@ -1089,14 +1086,16 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) - .filter(or_(DR.run_id == None, # noqa E711 + .filter(or_(DR.run_id == None, # noqa: E711 not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%')))) .outerjoin(DM, DM.dag_id == TI.dag_id) - .filter(or_(DM.dag_id == None, # noqa E711 + .filter(or_(DM.dag_id == None, # noqa: E711 not_(DM.is_paused))) ) if None in states: - ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(states))) # noqa E711 + ti_query = ti_query.filter( + or_(TI.state == None, TI.state.in_(states)) # noqa: E711 + ) else: ti_query = ti_query.filter(TI.state.in_(states)) @@ -1184,8 +1183,8 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): ) if current_task_concurrency >= task_concurrency_limit: self.log.info( - "Not executing %s since the number of tasks running or queued from DAG %s" - " is >= to the DAG's task concurrency limit of %s", + "Not executing %s since the number of tasks running or queued " + "from DAG %s is >= to the DAG's task concurrency limit of %s", task_instance, dag_id, task_concurrency_limit ) continue @@ -1261,7 +1260,7 @@ def _change_state_for_executable_task_instances(self, task_instances, if None in acceptable_states: ti_query = ti_query.filter( - or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa E711 + or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa: E711 ) else: ti_query = ti_query.filter(TI.state.in_(acceptable_states)) diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py index 97ebe4257f2c4..4e1977635a12f 100644 --- a/airflow/migrations/env.py +++ b/airflow/migrations/env.py @@ -85,6 +85,7 @@ def run_migrations_online(): with context.begin_transaction(): context.run_migrations() + if context.is_offline_mode(): run_migrations_offline() else: diff --git a/airflow/migrations/versions/05f30312d566_merge_heads.py b/airflow/migrations/versions/05f30312d566_merge_heads.py index 78d5652679ca9..f869cb8c08e5a 100644 --- a/airflow/migrations/versions/05f30312d566_merge_heads.py +++ b/airflow/migrations/versions/05f30312d566_merge_heads.py @@ -31,9 +31,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): pass diff --git a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py index 64ee41c44d9e2..567172f9bf68e 100644 --- a/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py +++ b/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py @@ -25,16 +25,16 @@ """ +from alembic import op +from sqlalchemy.dialects import mysql +import sqlalchemy as sa + # revision identifiers, used by Alembic. revision = '0e2a74e0fc9f' down_revision = 'd2ae31099d61' branch_labels = None depends_on = None -from alembic import op -from sqlalchemy.dialects import mysql -import sqlalchemy as sa - def upgrade(): conn = op.get_bind() @@ -69,14 +69,16 @@ def upgrade(): op.alter_column(table_name='log', column_name='dttm', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='log', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) - op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False) + op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), + nullable=False) op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) - op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), nullable=False) + op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.TIMESTAMP(fsp=6), + nullable=False) op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.TIMESTAMP(fsp=6)) op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.TIMESTAMP(fsp=6)) @@ -117,14 +119,16 @@ def upgrade(): op.alter_column(table_name='log', column_name='dttm', type_=sa.TIMESTAMP(timezone=True)) op.alter_column(table_name='log', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True)) - op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False) + op.alter_column(table_name='sla_miss', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), + nullable=False) op.alter_column(table_name='sla_miss', column_name='timestamp', type_=sa.TIMESTAMP(timezone=True)) op.alter_column(table_name='task_fail', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True)) op.alter_column(table_name='task_fail', column_name='start_date', type_=sa.TIMESTAMP(timezone=True)) op.alter_column(table_name='task_fail', column_name='end_date', type_=sa.TIMESTAMP(timezone=True)) - op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), nullable=False) + op.alter_column(table_name='task_instance', column_name='execution_date', type_=sa.TIMESTAMP(timezone=True), + nullable=False) op.alter_column(table_name='task_instance', column_name='start_date', type_=sa.TIMESTAMP(timezone=True)) op.alter_column(table_name='task_instance', column_name='end_date', type_=sa.TIMESTAMP(timezone=True)) op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=sa.TIMESTAMP(timezone=True)) @@ -161,14 +165,16 @@ def downgrade(): op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6)) op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False) + op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), + nullable=False) op.alter_column(table_name='sla_miss', column_name='DATETIME', type_=mysql.DATETIME(fsp=6)) op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6)) op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False) + op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), + nullable=False) op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6)) op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6)) op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6)) diff --git a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py index 6ee50aa94da81..288a0b60aa821 100644 --- a/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py +++ b/airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py @@ -23,6 +23,7 @@ Create Date: 2017-01-25 11:43:51.635667 """ +from alembic import op # revision identifiers, used by Alembic. revision = '127d2bf2dfa7' @@ -30,8 +31,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa def upgrade(): op.create_index('dag_id_state', 'dag_run', ['dag_id', 'state'], unique=False) diff --git a/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py b/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py index 3db0d41190408..fe84254c38caa 100644 --- a/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py +++ b/airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py @@ -24,6 +24,9 @@ Create Date: 2015-08-18 18:57:51.927315 """ +from alembic import op +import sqlalchemy as sa +from sqlalchemy.engine.reflection import Inspector # revision identifiers, used by Alembic. revision = '1507a7289a2f' @@ -31,10 +34,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa -from sqlalchemy.engine.reflection import Inspector - connectionhelper = sa.Table( 'connection', sa.MetaData(), diff --git a/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py b/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py index aaf938a0355ee..16ab349563428 100644 --- a/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py +++ b/airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py @@ -24,6 +24,8 @@ Create Date: 2016-02-02 17:20:55.692295 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '1968acfc09e3' @@ -31,12 +33,9 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): - op.add_column('variable', sa.Column('is_encrypted', sa.Boolean,default=False)) + op.add_column('variable', sa.Column('is_encrypted', sa.Boolean, default=False)) def downgrade(): diff --git a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py index 79fcff454114a..50d53652c4734 100644 --- a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py +++ b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py @@ -25,28 +25,27 @@ """ +from alembic import op +import sqlalchemy as sa + # revision identifiers, used by Alembic. revision = '1b38cef5b76e' down_revision = '502898887f84' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.create_table('dag_run', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('dag_id', sa.String(length=250), nullable=True), - sa.Column('execution_date', sa.DateTime(), nullable=True), - sa.Column('state', sa.String(length=50), nullable=True), - sa.Column('run_id', sa.String(length=250), nullable=True), - sa.Column('external_trigger', sa.Boolean(), nullable=True), - sa.PrimaryKeyConstraint('id'), - sa.UniqueConstraint('dag_id', 'execution_date'), - sa.UniqueConstraint('dag_id', 'run_id'), - ) + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('dag_id', sa.String(length=250), nullable=True), + sa.Column('execution_date', sa.DateTime(), nullable=True), + sa.Column('state', sa.String(length=50), nullable=True), + sa.Column('run_id', sa.String(length=250), nullable=True), + sa.Column('external_trigger', sa.Boolean(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('dag_id', 'execution_date'), + sa.UniqueConstraint('dag_id', 'run_id')) def downgrade(): diff --git a/airflow/migrations/versions/211e584da130_add_ti_state_index.py b/airflow/migrations/versions/211e584da130_add_ti_state_index.py index afc600d58aca1..b17f390e0b65b 100644 --- a/airflow/migrations/versions/211e584da130_add_ti_state_index.py +++ b/airflow/migrations/versions/211e584da130_add_ti_state_index.py @@ -24,6 +24,7 @@ Create Date: 2016-06-30 10:54:24.323588 """ +from alembic import op # revision identifiers, used by Alembic. revision = '211e584da130' @@ -31,9 +32,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.create_index('ti_state', 'task_instance', ['state'], unique=False) diff --git a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py index 27a9f593b54df..a757d2770971b 100644 --- a/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py +++ b/airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py @@ -26,18 +26,16 @@ """ +from alembic import op +import sqlalchemy as sa +import dill + # revision identifiers, used by Alembic. revision = '27c6a30d7c24' down_revision = '33ae817a1ff4' branch_labels = None depends_on = None - -from alembic import op -import sqlalchemy as sa -import dill - - TASK_INSTANCE_TABLE = "task_instance" NEW_COLUMN = "executor_config" @@ -48,4 +46,3 @@ def upgrade(): def downgrade(): op.drop_column(TASK_INSTANCE_TABLE, NEW_COLUMN) - diff --git a/airflow/migrations/versions/2e541a1dcfed_task_duration.py b/airflow/migrations/versions/2e541a1dcfed_task_duration.py index 6b24ef66e4ab2..595a5774a6b27 100644 --- a/airflow/migrations/versions/2e541a1dcfed_task_duration.py +++ b/airflow/migrations/versions/2e541a1dcfed_task_duration.py @@ -25,16 +25,16 @@ """ +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + # revision identifiers, used by Alembic. revision = '2e541a1dcfed' down_revision = '1b38cef5b76e' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql - def upgrade(): # use batch_alter_table to support SQLite workaround diff --git a/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py b/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py index 75db27cdb3a8c..fc8a1aab20e69 100644 --- a/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py +++ b/airflow/migrations/versions/2e82aab8ef20_rename_user_table.py @@ -24,6 +24,7 @@ Create Date: 2016-04-02 19:28:15.211915 """ +from alembic import op # revision identifiers, used by Alembic. revision = '2e82aab8ef20' @@ -31,9 +32,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.rename_table('user', 'users') diff --git a/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py b/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py index 4f1364b971a57..473f76778b89d 100644 --- a/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py +++ b/airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py @@ -17,13 +17,17 @@ # specific language governing permissions and limitations # under the License. -"""More logging into task_isntance +"""More logging into task_instance Revision ID: 338e90f54d61 Revises: 13eb55f81627 Create Date: 2015-08-25 06:09:20.460147 """ +# flake8: noqa: E266 + +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '338e90f54d61' @@ -31,9 +35,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): ### commands auto generated by Alembic - please adjust! ### diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py index c489c05f7ea39..925bf26df06cf 100644 --- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py +++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py @@ -25,6 +25,8 @@ Create Date: 2017-09-11 15:26:47.598494 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '33ae817a1ff4' @@ -32,10 +34,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - - RESOURCE_TABLE = "kube_resource_version" @@ -53,4 +51,3 @@ def upgrade(): def downgrade(): op.drop_table(RESOURCE_TABLE) - diff --git a/airflow/migrations/versions/40e67319e3a9_dagrun_config.py b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py index 02ea51501c49f..3da4d5f543038 100644 --- a/airflow/migrations/versions/40e67319e3a9_dagrun_config.py +++ b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py @@ -24,6 +24,8 @@ Create Date: 2015-10-29 08:36:31.726728 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '40e67319e3a9' @@ -31,9 +33,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.add_column('dag_run', sa.Column('conf', sa.PickleType(), nullable=True)) diff --git a/airflow/migrations/versions/4446e08588_dagrun_start_end.py b/airflow/migrations/versions/4446e08588_dagrun_start_end.py index 101f2ad9055bf..29932c92060e9 100644 --- a/airflow/migrations/versions/4446e08588_dagrun_start_end.py +++ b/airflow/migrations/versions/4446e08588_dagrun_start_end.py @@ -25,15 +25,15 @@ """ +from alembic import op +import sqlalchemy as sa + # revision identifiers, used by Alembic. revision = '4446e08588' down_revision = '561833c1c74b' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.add_column('dag_run', sa.Column('end_date', sa.DateTime(), nullable=True)) diff --git a/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py index c3898e95969b2..655ff61042d1e 100644 --- a/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py +++ b/airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py @@ -24,93 +24,147 @@ """ +from alembic import op +from sqlalchemy.dialects import mysql +from alembic import context + # revision identifiers, used by Alembic. revision = '4addfa1236f1' down_revision = 'f2ca10b85618' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql -from alembic import context - def upgrade(): if context.config.get_main_option('sqlalchemy.url').startswith('mysql'): - op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False) - op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(fsp=6), nullable=False) - op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME(fsp=6)) - - op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME(fsp=6)) - op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag', column_name='last_scheduler_run', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag', column_name='last_pickled', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag', column_name='last_expired', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='dag_pickle', column_name='created_dttm', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='dag_run', column_name='execution_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag_run', column_name='start_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='dag_run', column_name='end_date', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='import_error', column_name='timestamp', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='job', column_name='start_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='job', column_name='end_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='job', column_name='latest_heartbeat', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='known_event', column_name='start_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='known_event', column_name='end_date', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='log', column_name='dttm', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='log', column_name='execution_date', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='sla_miss', column_name='execution_date', + type_=mysql.DATETIME(fsp=6), + nullable=False) + op.alter_column(table_name='sla_miss', column_name='timestamp', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='task_fail', column_name='execution_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_fail', column_name='start_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_fail', column_name='end_date', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='task_instance', column_name='execution_date', + type_=mysql.DATETIME(fsp=6), + nullable=False) + op.alter_column(table_name='task_instance', column_name='start_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_instance', column_name='end_date', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='task_instance', column_name='queued_dttm', + type_=mysql.DATETIME(fsp=6)) + + op.alter_column(table_name='xcom', column_name='timestamp', + type_=mysql.DATETIME(fsp=6)) + op.alter_column(table_name='xcom', column_name='execution_date', + type_=mysql.DATETIME(fsp=6)) def downgrade(): if context.config.get_main_option('sqlalchemy.url').startswith('mysql'): - op.alter_column(table_name='dag', column_name='last_scheduler_run', type_=mysql.DATETIME()) - op.alter_column(table_name='dag', column_name='last_pickled', type_=mysql.DATETIME()) - op.alter_column(table_name='dag', column_name='last_expired', type_=mysql.DATETIME()) - - op.alter_column(table_name='dag_pickle', column_name='created_dttm', type_=mysql.DATETIME()) - - op.alter_column(table_name='dag_run', column_name='execution_date', type_=mysql.DATETIME()) - op.alter_column(table_name='dag_run', column_name='start_date', type_=mysql.DATETIME()) - op.alter_column(table_name='dag_run', column_name='end_date', type_=mysql.DATETIME()) - - op.alter_column(table_name='import_error', column_name='timestamp', type_=mysql.DATETIME()) - - op.alter_column(table_name='job', column_name='start_date', type_=mysql.DATETIME()) - op.alter_column(table_name='job', column_name='end_date', type_=mysql.DATETIME()) - op.alter_column(table_name='job', column_name='latest_heartbeat', type_=mysql.DATETIME()) - - op.alter_column(table_name='known_event', column_name='start_date', type_=mysql.DATETIME()) - op.alter_column(table_name='known_event', column_name='end_date', type_=mysql.DATETIME()) - - op.alter_column(table_name='log', column_name='dttm', type_=mysql.DATETIME()) - op.alter_column(table_name='log', column_name='execution_date', type_=mysql.DATETIME()) - - op.alter_column(table_name='sla_miss', column_name='execution_date', type_=mysql.DATETIME(), nullable=False) - op.alter_column(table_name='sla_miss', column_name='timestamp', type_=mysql.DATETIME()) - - op.alter_column(table_name='task_fail', column_name='execution_date', type_=mysql.DATETIME()) - op.alter_column(table_name='task_fail', column_name='start_date', type_=mysql.DATETIME()) - op.alter_column(table_name='task_fail', column_name='end_date', type_=mysql.DATETIME()) - - op.alter_column(table_name='task_instance', column_name='execution_date', type_=mysql.DATETIME(), nullable=False) - op.alter_column(table_name='task_instance', column_name='start_date', type_=mysql.DATETIME()) - op.alter_column(table_name='task_instance', column_name='end_date', type_=mysql.DATETIME()) - op.alter_column(table_name='task_instance', column_name='queued_dttm', type_=mysql.DATETIME()) - - op.alter_column(table_name='xcom', column_name='timestamp', type_=mysql.DATETIME()) - op.alter_column(table_name='xcom', column_name='execution_date', type_=mysql.DATETIME()) + op.alter_column(table_name='dag', column_name='last_scheduler_run', + type_=mysql.DATETIME()) + op.alter_column(table_name='dag', column_name='last_pickled', + type_=mysql.DATETIME()) + op.alter_column(table_name='dag', column_name='last_expired', + type_=mysql.DATETIME()) + + op.alter_column(table_name='dag_pickle', column_name='created_dttm', + type_=mysql.DATETIME()) + + op.alter_column(table_name='dag_run', column_name='execution_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='dag_run', column_name='start_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='dag_run', column_name='end_date', + type_=mysql.DATETIME()) + + op.alter_column(table_name='import_error', column_name='timestamp', + type_=mysql.DATETIME()) + + op.alter_column(table_name='job', column_name='start_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='job', column_name='end_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='job', column_name='latest_heartbeat', + type_=mysql.DATETIME()) + + op.alter_column(table_name='known_event', column_name='start_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='known_event', column_name='end_date', + type_=mysql.DATETIME()) + + op.alter_column(table_name='log', column_name='dttm', + type_=mysql.DATETIME()) + op.alter_column(table_name='log', column_name='execution_date', + type_=mysql.DATETIME()) + + op.alter_column(table_name='sla_miss', column_name='execution_date', + type_=mysql.DATETIME(), nullable=False) + op.alter_column(table_name='sla_miss', column_name='timestamp', + type_=mysql.DATETIME()) + + op.alter_column(table_name='task_fail', column_name='execution_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='task_fail', column_name='start_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='task_fail', column_name='end_date', + type_=mysql.DATETIME()) + + op.alter_column(table_name='task_instance', column_name='execution_date', + type_=mysql.DATETIME(), + nullable=False) + op.alter_column(table_name='task_instance', column_name='start_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='task_instance', column_name='end_date', + type_=mysql.DATETIME()) + op.alter_column(table_name='task_instance', column_name='queued_dttm', + type_=mysql.DATETIME()) + + op.alter_column(table_name='xcom', column_name='timestamp', + type_=mysql.DATETIME()) + op.alter_column(table_name='xcom', column_name='execution_date', + type_=mysql.DATETIME()) diff --git a/airflow/migrations/versions/502898887f84_adding_extra_to_log.py b/airflow/migrations/versions/502898887f84_adding_extra_to_log.py index 333b18e8d200d..632720a4e27b0 100644 --- a/airflow/migrations/versions/502898887f84_adding_extra_to_log.py +++ b/airflow/migrations/versions/502898887f84_adding_extra_to_log.py @@ -24,6 +24,8 @@ Create Date: 2015-11-03 22:50:49.794097 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '502898887f84' @@ -31,9 +33,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.add_column('log', sa.Column('extra', sa.Text(), nullable=True)) diff --git a/airflow/migrations/versions/52d714495f0_job_id_indices.py b/airflow/migrations/versions/52d714495f0_job_id_indices.py index 13f561cffb77f..94374cb68aa30 100644 --- a/airflow/migrations/versions/52d714495f0_job_id_indices.py +++ b/airflow/migrations/versions/52d714495f0_job_id_indices.py @@ -24,6 +24,7 @@ Create Date: 2015-10-20 03:17:01.962542 """ +from alembic import op # revision identifiers, used by Alembic. revision = '52d714495f0' @@ -31,12 +32,10 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): - op.create_index('idx_job_state_heartbeat', 'job', ['state', 'latest_heartbeat'], unique=False) + op.create_index('idx_job_state_heartbeat', 'job', + ['state', 'latest_heartbeat'], unique=False) def downgrade(): diff --git a/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py b/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py index d92db9f891b6b..a26a105ac8299 100644 --- a/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py +++ b/airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py @@ -24,6 +24,8 @@ Create Date: 2015-11-30 06:51:25.872557 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '561833c1c74b' @@ -31,9 +33,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.add_column('user', sa.Column('password', sa.String(255))) diff --git a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py index e23a0a8b2f742..77a35db48fad5 100644 --- a/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py +++ b/airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py @@ -24,15 +24,15 @@ """ +from alembic import op +import sqlalchemy as sa + # revision identifiers, used by Alembic. revision = '5e7d17757c7a' down_revision = '8504051e801b' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.add_column('task_instance', sa.Column('pid', sa.Integer)) diff --git a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py index 6dda5df43b146..2def57e904870 100644 --- a/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py +++ b/airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py @@ -23,6 +23,8 @@ Create Date: 2016-08-03 14:02:59.203021 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '64de9cddf6c9' @@ -30,9 +32,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.create_table( @@ -47,5 +46,6 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), ) + def downgrade(): op.drop_table('task_fail') diff --git a/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py b/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py index d6a4514ae2883..fdcbc59df8d00 100644 --- a/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py +++ b/airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py @@ -25,18 +25,18 @@ """ +from alembic import op + # revision identifiers, used by Alembic. revision = '8504051e801b' down_revision = '4addfa1236f1' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): - op.create_index('idx_xcom_dag_task_date', 'xcom', ['dag_id', 'task_id', 'execution_date'], unique=False) + op.create_index('idx_xcom_dag_task_date', 'xcom', + ['dag_id', 'task_id', 'execution_date'], unique=False) def downgrade(): diff --git a/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py b/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py index 5b11dc7860ea9..52a817081be7c 100644 --- a/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py +++ b/airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py @@ -25,15 +25,15 @@ """ +from alembic import op +import sqlalchemy as sa + # revision identifiers, used by Alembic. revision = '856955da8476' down_revision = 'f23433877c24' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): conn = op.get_bind() @@ -45,41 +45,39 @@ def upgrade(): # # Use batch_alter_table to support SQLite workaround. chart_table = sa.Table('chart', - sa.MetaData(), - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('label', sa.String(length=200), nullable=True), - sa.Column('conn_id', sa.String(length=250), nullable=False), - sa.Column('user_id', sa.Integer(), nullable=True), - sa.Column('chart_type', sa.String(length=100), nullable=True), - sa.Column('sql_layout', sa.String(length=50), nullable=True), - sa.Column('sql', sa.Text(), nullable=True), - sa.Column('y_log_scale', sa.Boolean(), nullable=True), - sa.Column('show_datatable', sa.Boolean(), nullable=True), - sa.Column('show_sql', sa.Boolean(), nullable=True), - sa.Column('height', sa.Integer(), nullable=True), - sa.Column('default_params', sa.String(length=5000), nullable=True), - sa.Column('x_is_date', sa.Boolean(), nullable=True), - sa.Column('iteration_no', sa.Integer(), nullable=True), - sa.Column('last_modified', sa.DateTime(), nullable=True), - sa.PrimaryKeyConstraint('id') - ) + sa.MetaData(), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('label', sa.String(length=200), nullable=True), + sa.Column('conn_id', sa.String(length=250), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.Column('chart_type', sa.String(length=100), nullable=True), + sa.Column('sql_layout', sa.String(length=50), nullable=True), + sa.Column('sql', sa.Text(), nullable=True), + sa.Column('y_log_scale', sa.Boolean(), nullable=True), + sa.Column('show_datatable', sa.Boolean(), nullable=True), + sa.Column('show_sql', sa.Boolean(), nullable=True), + sa.Column('height', sa.Integer(), nullable=True), + sa.Column('default_params', sa.String(length=5000), nullable=True), + sa.Column('x_is_date', sa.Boolean(), nullable=True), + sa.Column('iteration_no', sa.Integer(), nullable=True), + sa.Column('last_modified', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id')) with op.batch_alter_table('chart', copy_from=chart_table) as batch_op: batch_op.create_foreign_key('chart_user_id_fkey', 'users', ['user_id'], ['id']) known_event_table = sa.Table('known_event', - sa.MetaData(), - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('label', sa.String(length=200), nullable=True), - sa.Column('start_date', sa.DateTime(), nullable=True), - sa.Column('end_date', sa.DateTime(), nullable=True), - sa.Column('user_id', sa.Integer(), nullable=True), - sa.Column('known_event_type_id', sa.Integer(), nullable=True), - sa.Column('description', sa.Text(), nullable=True), - sa.ForeignKeyConstraint(['known_event_type_id'], - ['known_event_type.id'], ), - sa.PrimaryKeyConstraint('id') - ) + sa.MetaData(), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('label', sa.String(length=200), nullable=True), + sa.Column('start_date', sa.DateTime(), nullable=True), + sa.Column('end_date', sa.DateTime(), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.Column('known_event_type_id', sa.Integer(), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.ForeignKeyConstraint(['known_event_type_id'], + ['known_event_type.id'], ), + sa.PrimaryKeyConstraint('id')) with op.batch_alter_table('chart', copy_from=known_event_table) as batch_op: batch_op.create_foreign_key('known_event_user_id_fkey', 'users', ['user_id'], ['id']) diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py index 5c921c6a98aaf..ace7845965bf6 100644 --- a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py +++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py @@ -25,6 +25,8 @@ Create Date: 2018-04-03 15:31:20.814328 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '86770d1215c0' @@ -32,10 +34,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - - RESOURCE_TABLE = "kube_worker_uuid" diff --git a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py index b821cacedc400..6ff41baa28c7a 100644 --- a/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py +++ b/airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py @@ -24,6 +24,7 @@ Create Date: 2017-08-15 15:12:13.845074 """ +from alembic import op # revision identifiers, used by Alembic. revision = '947454bf1dff' @@ -31,9 +32,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.create_index('ti_job_id', 'task_instance', ['job_id'], unique=False) diff --git a/airflow/migrations/versions/9635ae0956e7_index_faskfail.py b/airflow/migrations/versions/9635ae0956e7_index_faskfail.py index da69846233e87..6b21c3474acc4 100644 --- a/airflow/migrations/versions/9635ae0956e7_index_faskfail.py +++ b/airflow/migrations/versions/9635ae0956e7_index_faskfail.py @@ -24,6 +24,7 @@ Create Date: 2018-06-17 21:40:01.963540 """ +from alembic import op # revision identifiers, used by Alembic. revision = '9635ae0956e7' @@ -31,9 +32,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.create_index('idx_task_fail_dag_task_date', 'task_fail', ['dag_id', 'task_id', 'execution_date'], unique=False) @@ -41,4 +39,3 @@ def upgrade(): def downgrade(): op.drop_index('idx_task_fail_dag_task_date', table_name='task_fail') - diff --git a/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py b/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py index a19eb58eb5c8d..503cd0b6f0406 100644 --- a/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py +++ b/airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py @@ -25,18 +25,19 @@ """ +from alembic import op +import sqlalchemy as sa + # revision identifiers, used by Alembic. revision = 'bba5a7cfc896' down_revision = 'bbc73705a13e' branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): - op.add_column('connection', sa.Column('is_extra_encrypted', sa.Boolean,default=False)) + op.add_column('connection', + sa.Column('is_extra_encrypted', sa.Boolean, default=False)) def downgrade(): diff --git a/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py b/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py index 04cdc22d6cdab..9855a6d4daf3c 100644 --- a/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py +++ b/airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py @@ -24,6 +24,8 @@ Create Date: 2016-01-14 18:05:54.871682 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = 'bbc73705a13e' @@ -31,12 +33,9 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): - op.add_column('sla_miss', sa.Column('notification_sent', sa.Boolean,default=False)) + op.add_column('sla_miss', sa.Column('notification_sent', sa.Boolean, default=False)) def downgrade(): diff --git a/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py index a02eea5519cc6..a1a5270c7bff4 100644 --- a/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py +++ b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py @@ -23,6 +23,9 @@ Create Date: 2017-08-14 16:06:31.568971 """ +from alembic import op +import dill +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = 'bdaa763e6c56' @@ -30,15 +33,10 @@ branch_labels = None depends_on = None -from alembic import op -import dill -import sqlalchemy as sa - def upgrade(): # There can be data truncation here as LargeBinary can be smaller than the pickle # type. - # use batch_alter_table to support SQLite workaround with op.batch_alter_table("xcom") as batch_op: batch_op.alter_column('value', type_=sa.LargeBinary()) diff --git a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py index 68228f7219a87..6155a40c81934 100644 --- a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py +++ b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py @@ -25,23 +25,22 @@ """ -# revision identifiers, used by Alembic. -revision = 'cc1e65623dc7' -down_revision = '127d2bf2dfa7' -branch_labels = None -depends_on = None - from alembic import op import sqlalchemy as sa from airflow import settings from airflow.models import DagBag from airflow.utils.sqlalchemy import UtcDateTime -from sqlalchemy import ( - Column, Integer, String) +from sqlalchemy import Column, Integer, String from sqlalchemy.engine.reflection import Inspector from sqlalchemy.ext.declarative import declarative_base +# revision identifiers, used by Alembic. +revision = 'cc1e65623dc7' +down_revision = '127d2bf2dfa7' +branch_labels = None +depends_on = None + Base = declarative_base() BATCH_SIZE = 5000 ID_LEN = 250 @@ -58,8 +57,7 @@ class TaskInstance(Base): def upgrade(): - op.add_column('task_instance', sa.Column('max_tries', sa.Integer, - server_default="-1")) + op.add_column('task_instance', sa.Column('max_tries', sa.Integer, server_default="-1")) # Check if table task_instance exist before data migration. This check is # needed for database that does not create table until migration finishes. # Checking task_instance table exists prevent the error of querying @@ -129,7 +127,7 @@ def downgrade(): # max number of self retry (task.retries) minus number of # times left for task instance to try the task. ti.try_number = max(0, task.retries - (ti.max_tries - - ti.try_number)) + ti.try_number)) ti.max_tries = -1 session.merge(ti) session.commit() diff --git a/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py b/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py index 5ecb0d5c72357..db5afaf023e54 100644 --- a/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py +++ b/airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py @@ -23,6 +23,9 @@ Create Date: 2017-08-18 17:07:16.686130 """ +from alembic import op +from sqlalchemy.dialects import mysql +from alembic import context # revision identifiers, used by Alembic. revision = 'd2ae31099d61' @@ -30,11 +33,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql -from alembic import context - def upgrade(): if context.config.get_main_option('sqlalchemy.url').startswith('mysql'): diff --git a/airflow/migrations/versions/e3a246e0dc1_current_schema.py b/airflow/migrations/versions/e3a246e0dc1_current_schema.py index cfa4147dd925d..cbf98976458d4 100644 --- a/airflow/migrations/versions/e3a246e0dc1_current_schema.py +++ b/airflow/migrations/versions/e3a246e0dc1_current_schema.py @@ -25,17 +25,17 @@ """ +from alembic import op +import sqlalchemy as sa +from sqlalchemy import func +from sqlalchemy.engine.reflection import Inspector + # revision identifiers, used by Alembic. revision = 'e3a246e0dc1' down_revision = None branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa -from sqlalchemy import func -from sqlalchemy.engine.reflection import Inspector - def upgrade(): conn = op.get_bind() diff --git a/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py b/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py index 44edeef0695a7..3e643f629dfcd 100644 --- a/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py +++ b/airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py @@ -24,6 +24,8 @@ Create Date: 2018-06-17 10:16:31.412131 """ +from alembic import op +from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. revision = 'f23433877c24' @@ -31,9 +33,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql def upgrade(): conn = op.get_bind() @@ -51,4 +50,3 @@ def downgrade(): op.alter_column('xcom', 'timestamp', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True) op.alter_column('xcom', 'execution_date', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True) op.alter_column('task_fail', 'execution_date', existing_type=mysql.TIMESTAMP(fsp=6), nullable=True) - diff --git a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py index 3e3d40abc4384..e14b4b8025109 100644 --- a/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py +++ b/airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py @@ -23,6 +23,8 @@ Create Date: 2016-07-20 15:08:28.247537 """ +from alembic import op +import sqlalchemy as sa # revision identifiers, used by Alembic. revision = 'f2ca10b85618' @@ -30,9 +32,6 @@ branch_labels = None depends_on = None -from alembic import op -import sqlalchemy as sa - def upgrade(): op.create_table('dag_stats', diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index c8cc981e936e7..4e2060baaec1c 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -19,7 +19,7 @@ import sys import os -from airflow.models import BaseOperator +from airflow.models import BaseOperator # noqa: F401 # ------------------------------------------------------------------------ # diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 216b2ed62667f..3393de0311491 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -24,12 +24,11 @@ from airflow.utils import timezone from datetime import datetime, timedelta -from dateutil.relativedelta import relativedelta # for doctest +from dateutil.relativedelta import relativedelta # flake8: noqa: F401 for doctest import six from croniter import croniter - cron_presets = { '@hourly': '0 * * * *', '@daily': '0 0 * * *', @@ -39,11 +38,7 @@ } -def date_range( - start_date, - end_date=None, - num=None, - delta=None): +def date_range(start_date, end_date=None, num=None, delta=None): """ Get a set of dates as a list based on a start, end and delta, delta can be something that can be added to ``datetime.datetime`` @@ -181,8 +176,8 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)): # Check if start_date + (lower + 1)*delta or # start_date + lower*delta is closer to dt and return the solution if ( - (start_date + (lower + 1) * delta) - dt <= - dt - (start_date + lower * delta)): + (start_date + (lower + 1) * delta) - dt <= + dt - (start_date + lower * delta)): return start_date + (lower + 1) * delta else: return start_date + lower * delta diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py index f1f0ea9a425e9..4bc8e0debdaaf 100644 --- a/airflow/utils/decorators.py +++ b/airflow/utils/decorators.py @@ -100,5 +100,6 @@ def wrapper(*args, **kwargs): return wrapper if 'BUILDING_AIRFLOW_DOCS' in os.environ: + # flake8: noqa: F811 # Monkey patch hook to get good function headers while building docs apply_defaults = lambda x: x diff --git a/airflow/utils/email.py b/airflow/utils/email.py index 08347a515c577..f4038715ac882 100644 --- a/airflow/utils/email.py +++ b/airflow/utils/email.py @@ -22,7 +22,6 @@ from __future__ import print_function from __future__ import unicode_literals -from builtins import str from past.builtins import basestring import importlib @@ -62,13 +61,13 @@ def send_email_smtp(to, subject, html_content, files=None, >>> send_email('test@example.com', 'foo', 'Foo bar', ['/dev/null'], dryrun=True) """ - SMTP_MAIL_FROM = configuration.conf.get('smtp', 'SMTP_MAIL_FROM') + smtp_mail_from = configuration.conf.get('smtp', 'SMTP_MAIL_FROM') to = get_email_address_list(to) msg = MIMEMultipart(mime_subtype) msg['Subject'] = subject - msg['From'] = SMTP_MAIL_FROM + msg['From'] = smtp_mail_from msg['To'] = ", ".join(to) recipients = to if cc: @@ -96,7 +95,7 @@ def send_email_smtp(to, subject, html_content, files=None, part['Content-ID'] = '<%s>' % basename msg.attach(part) - send_MIME_email(SMTP_MAIL_FROM, recipients, msg, dryrun) + send_MIME_email(smtp_mail_from, recipients, msg, dryrun) def send_MIME_email(e_from, e_to, mime_msg, dryrun=False): diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 45d0217e230ae..d6b1d93b38263 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -32,7 +32,6 @@ import os import re import signal -import subprocess import sys import warnings @@ -226,6 +225,7 @@ def reap_process_group(pid, log, sig=signal.SIGTERM, :param sig: signal type :param timeout: how much time a process has to terminate """ + def on_terminate(p): log.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode) diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index d74aabfbaead6..16372c0600bb5 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -# Using `from elasticsearch import *` would break elasticseach mocking used in unit test. +# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. import elasticsearch import pendulum from elasticsearch_dsl import Search diff --git a/airflow/www/app.py b/airflow/www/app.py index f7976b0dd52f6..0e6a2a0c23aa3 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -18,7 +18,6 @@ # under the License. # import six -import os from flask import Flask from flask_admin import Admin, base @@ -64,8 +63,8 @@ def create_app(config=None, testing=False): api.load_auth() api.api_auth.init_app(app) - cache = Cache( - app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'}) + # flake8: noqa: F841 + cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'}) app.register_blueprint(routes) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 0c4f4b05d6128..9ce114d5eda3e 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -17,11 +17,11 @@ # specific language governing permissions and limitations # under the License. # +# flake8: noqa: E402 import inspect from future import standard_library standard_library.install_aliases() -from builtins import str -from builtins import object +from builtins import str, object from cgi import escape from io import BytesIO as IO @@ -29,13 +29,13 @@ import gzip import json import time +import wtforms +from wtforms.compat import text_type from flask import after_this_request, request, Response -from flask_admin.contrib.sqla.filters import FilterConverter from flask_admin.model import filters +from flask_admin.contrib.sqla.filters import FilterConverter from flask_login import current_user -import wtforms -from wtforms.compat import text_type from airflow import configuration, models, settings from airflow.utils.db import create_session diff --git a/airflow/www/views.py b/airflow/www/views.py index 0c0dcff801a59..4401e15702668 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -22,7 +22,6 @@ import codecs import copy import datetime as dt -import inspect import itertools import json import logging @@ -365,6 +364,7 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag): 'dr_state': dr_state, } + class Airflow(BaseView): def is_visible(self): return False @@ -481,8 +481,6 @@ def chart_data(self): else: if chart.sql_layout == 'series': # User provides columns (series, x, y) - xaxis_label = df.columns[1] - yaxis_label = df.columns[2] df[df.columns[2]] = df[df.columns[2]].astype(np.float) df = df.pivot_table( index=df.columns[1], @@ -490,8 +488,6 @@ def chart_data(self): values=df.columns[2], aggfunc=np.sum) else: # User provides columns (x, y, metric1, metric2, ...) - xaxis_label = df.columns[0] - yaxis_label = 'y' df.index = df[df.columns[0]] df = df.sort(df.columns[0]) del df[df.columns[0]] @@ -599,8 +595,8 @@ def task_stats(self, session=None): session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state != State.RUNNING) - .filter(Dag.is_active == True) - .filter(Dag.is_subdag == False) + .filter(Dag.is_active == True) # noqa: E712 + .filter(Dag.is_subdag == False) # noqa: E712 .group_by(DagRun.dag_id) .subquery('last_dag_run') ) @@ -608,8 +604,8 @@ def task_stats(self, session=None): session.query(DagRun.dag_id, DagRun.execution_date) .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state == State.RUNNING) - .filter(Dag.is_active == True) - .filter(Dag.is_subdag == False) + .filter(Dag.is_active == True) # noqa: E712 + .filter(Dag.is_subdag == False) # noqa: E712 .subquery('running_dag_run') ) @@ -883,7 +879,7 @@ def task(self): for attr_name in dir(ti): if not attr_name.startswith('_'): attr = getattr(ti, attr_name) - if type(attr) != type(self.task): + if type(attr) != type(self.task): # noqa: E721 ti_attrs.append((attr_name, str(attr))) task_attrs = [] @@ -891,7 +887,7 @@ def task(self): if not attr_name.startswith('_'): attr = getattr(task, attr_name) if type(attr) != type(self.task) and \ - attr_name not in attr_renderer: + attr_name not in attr_renderer: # noqa: E721 task_attrs.append((attr_name, str(attr))) # Color coding the special attributes that are code @@ -1166,7 +1162,6 @@ def clear(self): @wwwutils.notify_owner def dagrun_clear(self): dag_id = request.args.get('dag_id') - task_id = request.args.get('task_id') origin = request.args.get('origin') execution_date = request.args.get('execution_date') confirmed = request.args.get('confirmed') == "true" diff --git a/airflow/www_rbac/app.py b/airflow/www_rbac/app.py index b319426aa95a7..321185ee9b56e 100644 --- a/airflow/www_rbac/app.py +++ b/airflow/www_rbac/app.py @@ -19,7 +19,6 @@ # import socket import six -import os from flask import Flask from flask_appbuilder import AppBuilder, SQLA @@ -59,7 +58,8 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"): api.load_auth() api.api_auth.init_app(app) - cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'}) # noqa + # flake8: noqa: F841 + cache = Cache(app=app, config={'CACHE_TYPE': 'filesystem', 'CACHE_DIR': '/tmp'}) from airflow.www_rbac.blueprints import routes app.register_blueprint(routes) diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 629f488fc7218..7a2a2d31244c1 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -35,7 +35,7 @@ import pendulum import sqlalchemy as sqla from flask import ( - g, redirect, request, Markup, Response, render_template, + redirect, request, Markup, Response, render_template, make_response, flash, jsonify) from flask._compat import PY2 from flask_appbuilder import BaseView, ModelView, expose, has_access diff --git a/dags/test_dag.py b/dags/test_dag.py index 3c385bbbefe43..a133dd5a1263b 100644 --- a/dags/test_dag.py +++ b/dags/test_dag.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,9 @@ from datetime import datetime, timedelta now = datetime.now() -now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0) +now_to_the_hour = ( + now - timedelta(0, 0, 0, 0, 0, 3) +).replace(minute=0, second=0, microsecond=0) START_DATE = now_to_the_hour DAG_NAME = 'test_dag_v1'