diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b0268404cf479..3203fa3109ed0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -145,6 +145,7 @@ jobs: postgresVersions: ${{ steps.selective-checks.outputs.postgres-versions }} defaultPostgresVersion: ${{ steps.selective-checks.outputs.default-postgres-version }} mysqlVersions: ${{ steps.selective-checks.outputs.mysql-versions }} + mssqlVersions: ${{ steps.selective-checks.outputs.mssql-versions }} defaultMySQLVersion: ${{ steps.selective-checks.outputs.default-mysql-version }} helmVersions: ${{ steps.selective-checks.outputs.helm-versions }} defaultHelmVersion: ${{ steps.selective-checks.outputs.default-helm-version }} @@ -153,6 +154,7 @@ jobs: testTypes: ${{ steps.selective-checks.outputs.test-types }} postgresExclude: ${{ steps.selective-checks.outputs.postgres-exclude }} mysqlExclude: ${{ steps.selective-checks.outputs.mysql-exclude }} + mssqlExclude: ${{ steps.selective-checks.outputs.mssql-exclude }} sqliteExclude: ${{ steps.selective-checks.outputs.sqlite-exclude }} run-tests: ${{ steps.selective-checks.outputs.run-tests }} run-ui-tests: ${{ steps.selective-checks.outputs.run-ui-tests }} @@ -762,6 +764,62 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" path: "./files/coverage*.xml" retention-days: 7 + tests-mssql: + timeout-minutes: 130 + name: > + MSSQL${{matrix.mssql-version}}, Py${{matrix.python-version}}: ${{needs.build-info.outputs.testTypes}} + runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }} + needs: [build-info, ci-images] + strategy: + matrix: + python-version: ${{ fromJson(needs.build-info.outputs.pythonVersions) }} + mssql-version: ${{ fromJson(needs.build-info.outputs.mssqlVersions) }} + exclude: ${{ fromJson(needs.build-info.outputs.mssqlExclude) }} + fail-fast: false + env: + RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }} + BACKEND: mssql + PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }} + MSSQL_VERSION: ${{ matrix.mssql-version }} + TEST_TYPES: "${{needs.build-info.outputs.testTypes}}" + GITHUB_REGISTRY: ${{ needs.ci-images.outputs.githubRegistry }} + if: needs.build-info.outputs.run-tests == 'true' + steps: + - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" + uses: actions/checkout@v2 + with: + persist-credentials: false + - name: "Setup python" + uses: actions/setup-python@v2 + with: + python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }} + - name: "Free space" + run: ./scripts/ci/tools/ci_free_space_on_ci.sh + - name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}" + run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh + - name: "Tests: ${{needs.build-info.outputs.testTypes}}" + run: ./scripts/ci/testing/ci_run_airflow_testing.sh + - name: "Upload airflow logs" + uses: actions/upload-artifact@v2 + if: failure() + with: + name: airflow-logs-${{matrix.python-version}}-${{matrix.mssql-version}} + path: "./files/airflow_logs*" + retention-days: 7 + - name: "Upload container logs" + uses: actions/upload-artifact@v2 + if: failure() + with: + name: container-logs-mssql-${{matrix.python-version}}-${{matrix.mssql-version}} + path: "./files/container_logs*" + retention-days: 7 + - name: "Upload artifact for coverage" + uses: actions/upload-artifact@v2 + with: + name: coverage-mssql-${{matrix.python-version}}-${{matrix.mssql-version}} + path: "./files/coverage*.xml" + retention-days: 7 + tests-sqlite: timeout-minutes: 130 name: > @@ -898,6 +956,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" - tests-postgres - tests-sqlite - tests-mysql + - tests-mssql - tests-quarantined env: RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }} @@ -1046,6 +1105,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" - tests-sqlite - tests-postgres - tests-mysql + - tests-mssql - tests-kubernetes - prod-images - docs @@ -1107,6 +1167,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" - tests-sqlite - tests-postgres - tests-mysql + - tests-mssql - tests-kubernetes - ci-images - docs @@ -1151,6 +1212,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" - static-checks-pylint - tests-sqlite - tests-mysql + - tests-mssql - tests-postgres - tests-kubernetes env: @@ -1221,6 +1283,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}" - tests-sqlite - tests-postgres - tests-mysql + - tests-mssql - tests-kubernetes - constraints - prepare-test-provider-packages-wheel diff --git a/BREEZE.rst b/BREEZE.rst index ed2b4fda352d2..2f0510a015ca2 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -1883,7 +1883,7 @@ This is the current syntax for `./breeze <./breeze>`_: Backend to use for tests - it determines which database is used. One of: - sqlite mysql postgres + sqlite mysql postgres mssql Default: sqlite @@ -2349,7 +2349,7 @@ This is the current syntax for `./breeze <./breeze>`_: Backend to use for tests - it determines which database is used. One of: - sqlite mysql postgres + sqlite mysql postgres mssql Default: sqlite diff --git a/Dockerfile.ci b/Dockerfile.ci index 92b6a456b0269..14e4294c6b029 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -161,6 +161,17 @@ RUN mkdir -pv /usr/share/man/man1 \ && apt-get autoremove -yqq --purge \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* \ + && curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \ + && curl https://packages.microsoft.com/config/debian/9/prod.list > /etc/apt/sources.list.d/mssql-release.list \ + && apt-get update -yqq \ + && apt-get upgrade -yqq \ + && ACCEPT_EULA=Y apt-get -yqq install -y --no-install-recommends \ + gcc \ + unixodbc-dev \ + g++ \ + msodbcsql17 \ + mssql-tools \ + && rm -rf /var/lib/apt/lists/* \ && curl https://download.docker.com/linux/static/stable/x86_64/docker-${DOCKER_CLI_VERSION}.tgz \ | tar -C /usr/bin --strip-components=1 -xvzf - docker/docker diff --git a/README.md b/README.md index c9a8c866373d5..d0d2e173a0fa8 100644 --- a/README.md +++ b/README.md @@ -97,13 +97,14 @@ We **highly** recommend upgrading to the latest Airflow major release at the ear Apache Airflow is tested with: -| | Master version (dev) | Stable version (2.0.2) | Previous version (1.10.15) | -| ------------ | ------------------------- | ------------------------ | ------------------------- | -| Python | 3.6, 3.7, 3.8 | 3.6, 3.7, 3.8 | 2.7, 3.5, 3.6, 3.7, 3.8 | -| Kubernetes | 1.20, 1.19, 1.18 | 1.20, 1.19, 1.18 | 1.18, 1.17, 1.16 | -| PostgreSQL | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 | -| MySQL | 5.7, 8 | 5.7, 8 | 5.6, 5.7 | -| SQLite | 3.15.0+ | 3.15.0+ | 3.15.0+ | +| | Master version (dev) | Stable version (2.0.2) | Previous version (1.10.15) | +| -------------------- | ------------------------- | ------------------------ | ------------------------- | +| Python | 3.6, 3.7, 3.8 | 3.6, 3.7, 3.8 | 2.7, 3.5, 3.6, 3.7, 3.8 | +| Kubernetes | 1.20, 1.19, 1.18 | 1.20, 1.19, 1.18 | 1.18, 1.17, 1.16 | +| PostgreSQL | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 | +| MySQL | 5.7, 8 | 5.7, 8 | 5.6, 5.7 | +| SQLite | 3.15.0+ | 3.15.0+ | 3.15.0+ | +| MSSQL(Experimental) | 2017,2019 | | | **Note:** MySQL 5.x versions are unable to or have limitations with running multiple schedulers -- please see the [Scheduler docs](https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html). diff --git a/airflow/api_connexion/parameters.py b/airflow/api_connexion/parameters.py index 8e06301676900..1dac50b351ebf 100644 --- a/airflow/api_connexion/parameters.py +++ b/airflow/api_connexion/parameters.py @@ -18,7 +18,7 @@ from typing import Callable, Dict, TypeVar, cast from pendulum.parsing import ParserError -from sqlalchemy import asc, desc +from sqlalchemy import text from airflow.api_connexion.exceptions import BadRequest from airflow.configuration import conf @@ -97,11 +97,10 @@ def apply_sorting(query, order_by, to_replace=None, allowed_attrs=None): detail=f"Ordering with '{lstriped_orderby}' is disallowed or " f"the attribute does not exist on the model" ) + if to_replace: + lstriped_orderby = to_replace.get(lstriped_orderby, lstriped_orderby) if order_by[0] == "-": - func = desc - order_by = lstriped_orderby + order_by = f"{lstriped_orderby} desc" else: - func = asc - if to_replace: - order_by = to_replace.get(order_by, order_by) - return query.order_by(func(order_by)) + order_by = f"{lstriped_orderby} asc" + return query.order_by(text(order_by)) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index b400ae92e6c29..bf4ad42c23b3f 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -37,6 +37,7 @@ from sqlalchemy.exc import OperationalError from sqlalchemy.orm import load_only, selectinload from sqlalchemy.orm.session import Session, make_transient +from sqlalchemy.sql import expression from airflow import models, settings from airflow.configuration import conf @@ -1067,15 +1068,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = task_instance_str = "\n\t".join(repr(x) for x in executable_tis) self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str) - - # set TIs to queued state - filter_for_tis = TI.filter_for_tis(executable_tis) - session.query(TI).filter(filter_for_tis).update( - # TODO[ha]: should we use func.now()? How does that work with DB timezone on mysql when it's not - # UTC? - {TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(), TI.queued_by_job_id: self.id}, - synchronize_session=False, - ) + if len(executable_tis) > 0: + # set TIs to queued state + filter_for_tis = TI.filter_for_tis(executable_tis) + session.query(TI).filter(filter_for_tis).update( + # TODO[ha]: should we use func.now()? How does that work with DB timezone + # on mysql when it's not UTC? + {TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(), TI.queued_by_job_id: self.id}, + synchronize_session=False, + ) for ti in executable_tis: make_transient(ti) @@ -1580,14 +1581,24 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) -> # as DagModel.dag_id and DagModel.next_dagrun # This list is used to verify if the DagRun already exist so that we don't attempt to create # duplicate dag runs - active_dagruns = ( - session.query(DagRun.dag_id, DagRun.execution_date) - .filter( - tuple_(DagRun.dag_id, DagRun.execution_date).in_( - [(dm.dag_id, dm.next_dagrun) for dm in dag_models] - ) + + if session.bind.dialect.name == 'mssql': + active_dagruns_filter = or_( + *[ + and_( + DagRun.dag_id == dm.dag_id, + DagRun.execution_date == dm.next_dagrun, + ) + for dm in dag_models + ] ) - .all() + else: + active_dagruns_filter = tuple_(DagRun.dag_id, DagRun.execution_date).in_( + [(dm.dag_id, dm.next_dagrun) for dm in dag_models] + ) + + active_dagruns = ( + session.query(DagRun.dag_id, DagRun.execution_date).filter(active_dagruns_filter).all() ) for dag_model in dag_models: @@ -1644,7 +1655,7 @@ def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Sess .filter( DagRun.dag_id.in_([o.dag_id for o in dag_models]), DagRun.state == State.RUNNING, # pylint: disable=comparison-with-callable - DagRun.external_trigger.is_(False), + DagRun.external_trigger == expression.false(), ) .group_by(DagRun.dag_id) .all() diff --git a/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py b/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py new file mode 100644 index 0000000000000..ce5f3fc57b06b --- /dev/null +++ b/airflow/migrations/versions/83f031fd9f1c_improve_mssql_compatibility.py @@ -0,0 +1,257 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""improve mssql compatibility + +Revision ID: 83f031fd9f1c +Revises: a13f7613ad25 +Create Date: 2021-04-06 12:22:02.197726 + +""" + +from collections import defaultdict + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mssql + +# revision identifiers, used by Alembic. +revision = '83f031fd9f1c' +down_revision = 'a13f7613ad25' +branch_labels = None +depends_on = None + + +def is_table_empty(conn, table_name): + """ + This function checks if the MS SQL table is empty + + :param conn: SQL connection object + :param table_name: table name + :return: Booelan indicating if the table is present + """ + return conn.execute(f'select TOP 1 * from {table_name}').first() is None + + +def get_table_constraints(conn, table_name): + """ + This function return primary and unique constraint + along with column name. some tables like task_instance + is missing primary key constraint name and the name is + auto-generated by sql server. so this function helps to + retrieve any primary or unique constraint name. + + :param conn: sql connection object + :param table_name: table name + :return: a dictionary of ((constraint name, constraint type), column name) of table + :rtype: defaultdict(list) + """ + query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME + FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc + JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME + WHERE tc.TABLE_NAME = '{table_name}' AND + (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE') + """.format( + table_name=table_name + ) + result = conn.execute(query).fetchall() + constraint_dict = defaultdict(list) + for constraint, constraint_type, column in result: + constraint_dict[(constraint, constraint_type)].append(column) + return constraint_dict + + +def drop_column_constraints(operator, column_name, constraint_dict): + """ + Drop a primary key or unique constraint + + :param operator: batch_alter_table for the table + :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table + """ + for constraint, columns in constraint_dict.items(): + if column_name in columns: + if constraint[1].lower().startswith("primary"): + operator.drop_constraint(constraint[0], type_='primary') + elif constraint[1].lower().startswith("unique"): + operator.drop_constraint(constraint[0], type_='unique') + + +def create_constraints(operator, column_name, constraint_dict): + """ + Create a primary key or unique constraint + + :param operator: batch_alter_table for the table + :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table + """ + for constraint, columns in constraint_dict.items(): + if column_name in columns: + if constraint[1].lower().startswith("primary"): + operator.create_primary_key(constraint_name=constraint[0], columns=columns) + elif constraint[1].lower().startswith("unique"): + operator.create_unique_constraint(constraint_name=constraint[0], columns=columns) + + +def _use_date_time2(conn): + result = conn.execute( + """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion')) + like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion')) + like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion""" + ).fetchone() + mssql_version = result[0] + return mssql_version not in ("2000", "2005") + + +def _is_timestamp(conn, table_name, column_name): + query = f"""SELECT + TYPE_NAME(C.USER_TYPE_ID) AS DATA_TYPE + FROM SYS.COLUMNS C + JOIN SYS.TYPES T + ON C.USER_TYPE_ID=T.USER_TYPE_ID + WHERE C.OBJECT_ID=OBJECT_ID('{table_name}') and C.NAME='{column_name}'; + """ + column_type = conn.execute(query).fetchone()[0] + return column_type == "timestamp" + + +def recreate_mssql_ts_column(conn, op, table_name, column_name): + """ + Drop the timestamp column and recreate it as + datetime or datetime2(6) + """ + if _is_timestamp(conn, table_name, column_name) and is_table_empty(conn, table_name): + with op.batch_alter_table(table_name) as batch_op: + constraint_dict = get_table_constraints(conn, table_name) + drop_column_constraints(batch_op, column_name, constraint_dict) + batch_op.drop_column(column_name=column_name) + if _use_date_time2(conn): + batch_op.add_column(sa.Column(column_name, mssql.DATETIME2(precision=6), nullable=False)) + else: + batch_op.add_column(sa.Column(column_name, mssql.DATETIME, nullable=False)) + create_constraints(batch_op, column_name, constraint_dict) + + +def alter_mssql_datetime_column(conn, op, table_name, column_name, nullable): + """Update the datetime column to datetime2(6)""" + if _use_date_time2(conn): + op.alter_column( + table_name=table_name, + column_name=column_name, + type_=mssql.DATETIME2(precision=6), + nullable=nullable, + ) + + +def alter_mssql_datetime2_column(conn, op, table_name, column_name, nullable): + """Update the datetime2(6) column to datetime""" + if _use_date_time2(conn): + op.alter_column( + table_name=table_name, column_name=column_name, type_=mssql.DATETIME, nullable=nullable + ) + + +def _get_timestamp(conn): + if _use_date_time2(conn): + return mssql.DATETIME2(precision=6) + else: + return mssql.DATETIME + + +def upgrade(): + """Improve compatibility with MSSQL backend""" + conn = op.get_bind() + if conn.dialect.name != 'mssql': + return + recreate_mssql_ts_column(conn, op, 'dag_code', 'last_updated') + recreate_mssql_ts_column(conn, op, 'rendered_task_instance_fields', 'execution_date') + alter_mssql_datetime_column(conn, op, 'serialized_dag', 'last_updated', False) + op.alter_column(table_name="xcom", column_name="timestamp", type_=_get_timestamp(conn), nullable=False) + with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op: + task_reschedule_batch_op.alter_column( + column_name='end_date', type_=_get_timestamp(conn), nullable=False + ) + task_reschedule_batch_op.alter_column( + column_name='reschedule_date', type_=_get_timestamp(conn), nullable=False + ) + task_reschedule_batch_op.alter_column( + column_name='start_date', type_=_get_timestamp(conn), nullable=False + ) + with op.batch_alter_table('task_fail') as task_fail_batch_op: + task_fail_batch_op.drop_index('idx_task_fail_dag_task_date') + task_fail_batch_op.alter_column( + column_name="execution_date", type_=_get_timestamp(conn), nullable=False + ) + task_fail_batch_op.create_index( + 'idx_task_fail_dag_task_date', ['dag_id', 'task_id', 'execution_date'], unique=False + ) + with op.batch_alter_table('task_instance') as task_instance_batch_op: + task_instance_batch_op.drop_index('ti_state_lkp') + task_instance_batch_op.create_index( + 'ti_state_lkp', ['dag_id', 'task_id', 'execution_date', 'state'], unique=False + ) + constraint_dict = get_table_constraints(conn, 'dag_run') + for constraint, columns in constraint_dict.items(): + if 'dag_id' in columns: + if constraint[1].lower().startswith("unique"): + op.drop_constraint(constraint[0], 'dag_run', type_='unique') + # create filtered indexes + conn.execute( + """CREATE UNIQUE NONCLUSTERED INDEX idx_not_null_dag_id_execution_date + ON dag_run(dag_id,execution_date) + WHERE dag_id IS NOT NULL and execution_date is not null""" + ) + conn.execute( + """CREATE UNIQUE NONCLUSTERED INDEX idx_not_null_dag_id_run_id + ON dag_run(dag_id,run_id) + WHERE dag_id IS NOT NULL and run_id is not null""" + ) + + +def downgrade(): + """Reverse MSSQL backend compatibility improvements""" + conn = op.get_bind() + if conn.dialect.name != 'mssql': + return + alter_mssql_datetime2_column(conn, op, 'serialized_dag', 'last_updated', False) + op.alter_column(table_name="xcom", column_name="timestamp", type_=_get_timestamp(conn), nullable=True) + with op.batch_alter_table('task_reschedule') as task_reschedule_batch_op: + task_reschedule_batch_op.alter_column( + column_name='end_date', type_=_get_timestamp(conn), nullable=True + ) + task_reschedule_batch_op.alter_column( + column_name='reschedule_date', type_=_get_timestamp(conn), nullable=True + ) + task_reschedule_batch_op.alter_column( + column_name='start_date', type_=_get_timestamp(conn), nullable=True + ) + with op.batch_alter_table('task_fail') as task_fail_batch_op: + task_fail_batch_op.drop_index('idx_task_fail_dag_task_date') + task_fail_batch_op.alter_column( + column_name="execution_date", type_=_get_timestamp(conn), nullable=False + ) + task_fail_batch_op.create_index( + 'idx_task_fail_dag_task_date', ['dag_id', 'task_id', 'execution_date'], unique=False + ) + with op.batch_alter_table('task_instance') as task_instance_batch_op: + task_instance_batch_op.drop_index('ti_state_lkp') + task_instance_batch_op.create_index( + 'ti_state_lkp', ['dag_id', 'task_id', 'execution_date'], unique=False + ) + op.create_unique_constraint('UQ__dag_run__dag_id_run_id', 'dag_run', ['dag_id', 'run_id']) + op.create_unique_constraint('UQ__dag_run__dag_id_execution_date', 'dag_run', ['dag_id', 'execution_date']) + op.drop_index('idx_not_null_dag_id_execution_date', table_name='dag_run') + op.drop_index('idx_not_null_dag_id_run_id', table_name='dag_run') diff --git a/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py b/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py index 2c743e4813185..44be9880ec5ee 100644 --- a/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py +++ b/airflow/migrations/versions/8d48763f6d53_add_unique_constraint_to_conn_id.py @@ -38,9 +38,9 @@ def upgrade(): """Apply add unique constraint to conn_id and set it as non-nullable""" try: with op.batch_alter_table('connection') as batch_op: + batch_op.alter_column("conn_id", nullable=False, existing_type=sa.String(250)) batch_op.create_unique_constraint(constraint_name="unique_conn_id", columns=["conn_id"]) - batch_op.alter_column("conn_id", nullable=False, existing_type=sa.String(250)) except sa.exc.IntegrityError: raise Exception("Make sure there are no duplicate connections with the same conn_id or null values") diff --git a/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py b/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py index 8019aa2d91be5..d220d32b1653f 100644 --- a/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py +++ b/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py @@ -26,7 +26,7 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy.dialects import mysql +from sqlalchemy.dialects import mssql, mysql # revision identifiers, used by Alembic. revision = '98271e7606e2' @@ -35,12 +35,32 @@ depends_on = None +def _use_date_time2(conn): + result = conn.execute( + """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion')) + like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion')) + like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion""" + ).fetchone() + mssql_version = result[0] + return mssql_version not in ("2000", "2005") + + +def _get_timestamp(conn): + dialect_name = conn.dialect.name + if dialect_name == "mssql": + return mssql.DATETIME2(precision=6) if _use_date_time2(conn) else mssql.DATETIME + elif dialect_name != "mysql": + return sa.TIMESTAMP(timezone=True) + else: + return mysql.TIMESTAMP(fsp=6, timezone=True) + + def upgrade(): """Apply Add scheduling_decision to DagRun and DAG""" conn = op.get_bind() # pylint: disable=no-member - is_mysql = bool(conn.dialect.name == "mysql") is_sqlite = bool(conn.dialect.name == "sqlite") - timestamp = sa.TIMESTAMP(timezone=True) if not is_mysql else mysql.TIMESTAMP(fsp=6, timezone=True) + is_mssql = bool(conn.dialect.name == "mssql") + timestamp = _get_timestamp(conn) if is_sqlite: op.execute("PRAGMA foreign_keys=off") @@ -71,7 +91,7 @@ def upgrade(): op.execute( "UPDATE dag SET concurrency={}, has_task_concurrency_limits={} where concurrency IS NULL".format( - concurrency, 1 if is_sqlite else sa.true() + concurrency, 1 if is_sqlite or is_mssql else sa.true() ) ) diff --git a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py index f063aeafd227d..045f5a63179fd 100644 --- a/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py +++ b/airflow/migrations/versions/bbf4a7ad0465_remove_id_column_from_xcom.py @@ -24,6 +24,8 @@ """ +from collections import defaultdict + from alembic import op from sqlalchemy import Column, Integer from sqlalchemy.engine.reflection import Inspector @@ -35,6 +37,64 @@ depends_on = None +def get_table_constraints(conn, table_name): + """ + This function return primary and unique constraint + along with column name. Some tables like `task_instance` + is missing the primary key constraint name and the name is + auto-generated by the SQL server. so this function helps to + retrieve any primary or unique constraint name. + + :param conn: sql connection object + :param table_name: table name + :return: a dictionary of ((constraint name, constraint type), column name) of table + :rtype: defaultdict(list) + """ + query = """SELECT tc.CONSTRAINT_NAME , tc.CONSTRAINT_TYPE, ccu.COLUMN_NAME + FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc + JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME + WHERE tc.TABLE_NAME = '{table_name}' AND + (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE') + """.format( + table_name=table_name + ) + result = conn.execute(query).fetchall() + constraint_dict = defaultdict(list) + for constraint, constraint_type, column in result: + constraint_dict[(constraint, constraint_type)].append(column) + return constraint_dict + + +def drop_column_constraints(operator, column_name, constraint_dict): + """ + Drop a primary key or unique constraint + + :param operator: batch_alter_table for the table + :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table + """ + for constraint, columns in constraint_dict.items(): + if column_name in columns: + if constraint[1].lower().startswith("primary"): + operator.drop_constraint(constraint[0], type_='primary') + elif constraint[1].lower().startswith("unique"): + operator.drop_constraint(constraint[0], type_='unique') + + +def create_constraints(operator, column_name, constraint_dict): + """ + Create a primary key or unique constraint + + :param operator: batch_alter_table for the table + :param constraint_dict: a dictionary of ((constraint name, constraint type), column name) of table + """ + for constraint, columns in constraint_dict.items(): + if column_name in columns: + if constraint[1].lower().startswith("primary"): + operator.create_primary_key(constraint_name=constraint[0], columns=columns) + elif constraint[1].lower().startswith("unique"): + operator.create_unique_constraint(constraint_name=constraint[0], columns=columns) + + def upgrade(): """Apply Remove id column from xcom""" conn = op.get_bind() @@ -43,9 +103,14 @@ def upgrade(): with op.batch_alter_table('xcom') as bop: xcom_columns = [col.get('name') for col in inspector.get_columns("xcom")] if "id" in xcom_columns: + if conn.dialect.name == 'mssql': + constraint_dict = get_table_constraints(conn, "xcom") + drop_column_constraints(bop, 'id', constraint_dict) bop.drop_column('id') bop.drop_index('idx_xcom_dag_task_date') - bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 'execution_date']) + # mssql doesn't allow primary keys with nullable columns + if conn.dialect.name != 'mssql': + bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 'execution_date']) def downgrade(): diff --git a/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py new file mode 100644 index 0000000000000..bde065b3e1ef4 --- /dev/null +++ b/airflow/migrations/versions/e9304a3141f0_make_xcom_pkey_columns_non_nullable.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""make xcom pkey columns non-nullable + +Revision ID: e9304a3141f0 +Revises: 83f031fd9f1c +Create Date: 2021-04-06 13:22:02.197726 + +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mssql, mysql + +from airflow.models.base import COLLATION_ARGS + +# revision identifiers, used by Alembic. +revision = 'e9304a3141f0' +down_revision = '83f031fd9f1c' +branch_labels = None +depends_on = None + + +def _use_date_time2(conn): + result = conn.execute( + """SELECT CASE WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion')) + like '8%' THEN '2000' WHEN CONVERT(VARCHAR(128), SERVERPROPERTY ('productversion')) + like '9%' THEN '2005' ELSE '2005Plus' END AS MajorVersion""" + ).fetchone() + mssql_version = result[0] + return mssql_version not in ("2000", "2005") + + +def _get_timestamp(conn): + dialect_name = conn.dialect.name + if dialect_name == "mssql": + return mssql.DATETIME2(precision=6) if _use_date_time2(conn) else mssql.DATETIME + elif dialect_name == "mysql": + return mysql.TIMESTAMP(fsp=6, timezone=True) + else: + return sa.TIMESTAMP(timezone=True) + + +def upgrade(): + """Apply make xcom pkey columns non-nullable""" + conn = op.get_bind() + with op.batch_alter_table('xcom') as bop: + bop.alter_column("key", type_=sa.String(length=512, **COLLATION_ARGS), nullable=False) + bop.alter_column("execution_date", type_=_get_timestamp(conn), nullable=False) + if conn.dialect.name == 'mssql': + bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 'execution_date']) + + +def downgrade(): + """Unapply make xcom pkey columns non-nullable""" + conn = op.get_bind() + with op.batch_alter_table('xcom') as bop: + if conn.dialect.name == 'mssql': + bop.drop_constraint('pk_xcom', 'primary') + bop.alter_column("key", type_=sa.String(length=512, **COLLATION_ARGS), nullable=True) + bop.alter_column("execution_date", type_=_get_timestamp(conn), nullable=True) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 9f929bf2d264a..1cf3797d79c17 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -52,6 +52,7 @@ from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_ from sqlalchemy.orm import backref, joinedload, relationship from sqlalchemy.orm.session import Session +from sqlalchemy.sql import expression import airflow.templates from airflow import settings, utils @@ -100,7 +101,7 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False): DR = DagRun query = session.query(DR).filter(DR.dag_id == dag_id) if not include_externally_triggered: - query = query.filter(DR.external_trigger == False) # noqa pylint: disable=singleton-comparison + query = query.filter(DR.external_trigger == expression.false()) query = query.order_by(DR.execution_date.desc()) return query.first() @@ -897,7 +898,9 @@ def get_num_active_runs(self, external_trigger=None, session=None): ) if external_trigger is not None: - query = query.filter(DagRun.external_trigger == external_trigger) + query = query.filter( + DagRun.external_trigger == (expression.true() if external_trigger else expression.false()) + ) return query.scalar() @@ -1872,7 +1875,7 @@ def bulk_write_to_db(cls, dags: Collection["DAG"], session=None): .filter( DagRun.dag_id.in_(existing_dag_ids), DagRun.state == State.RUNNING, # pylint: disable=comparison-with-callable - DagRun.external_trigger.is_(False), + DagRun.external_trigger == expression.false(), ) .group_by(DagRun.dag_id) .all() @@ -2186,7 +2189,7 @@ def get_paused_dag_ids(dag_ids: List[str], session: Session = None) -> Set[str]: """ paused_dag_ids = ( session.query(DagModel.dag_id) - .filter(DagModel.is_paused.is_(True)) + .filter(DagModel.is_paused == expression.true()) .filter(DagModel.dag_id.in_(dag_ids)) .all() ) @@ -2270,8 +2273,8 @@ def dags_needing_dagruns(cls, session: Session): query = ( session.query(cls) .filter( - cls.is_paused.is_(False), - cls.is_active.is_(True), + cls.is_paused == expression.false(), + cls.is_active == expression.true(), cls.next_dagrun_create_after <= func.now(), ) .order_by(cls.next_dagrun_create_after) diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py index ce65fc2f159ec..4295a5bffe095 100644 --- a/airflow/models/dagcode.py +++ b/airflow/models/dagcode.py @@ -20,7 +20,8 @@ from datetime import datetime from typing import Iterable, List, Optional -from sqlalchemy import BigInteger, Column, String, Text, exists +from sqlalchemy import BigInteger, Column, String, Text +from sqlalchemy.sql.expression import literal from airflow.exceptions import AirflowException, DagCodeNotFound from airflow.models.base import Base @@ -147,7 +148,7 @@ def has_dag(cls, fileloc: str, session=None) -> bool: :param session: ORM Session """ fileloc_hash = cls.dag_fileloc_hash(fileloc) - return session.query(exists().where(cls.fileloc_hash == fileloc_hash)).scalar() + return session.query(literal(True)).filter(cls.fileloc_hash == fileloc_hash).one_or_none() is not None @classmethod def get_code_by_fileloc(cls, fileloc: str) -> str: diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 6bd96f3aa7eee..e18351d3c1fb8 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -35,6 +35,7 @@ from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import backref, relationship, synonym from sqlalchemy.orm.session import Session +from sqlalchemy.sql import expression from airflow import settings from airflow.configuration import conf as airflow_conf @@ -211,8 +212,8 @@ def next_dagruns_to_examine( DagModel.dag_id == cls.dag_id, ) .filter( - DagModel.is_paused.is_(False), - DagModel.is_active.is_(True), + DagModel.is_paused == expression.false(), + DagModel.is_active == expression.true(), ) .order_by( nulls_first(cls.last_scheduling_decision, session=session), diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py index 81448b6e64b8d..d0db6e8b5b981 100644 --- a/airflow/models/serialized_dag.py +++ b/airflow/models/serialized_dag.py @@ -26,8 +26,7 @@ import sqlalchemy_jsonfield from sqlalchemy import BigInteger, Column, Index, String, and_ from sqlalchemy.orm import Session, backref, foreign, relationship -from sqlalchemy.sql import exists -from sqlalchemy.sql.expression import func +from sqlalchemy.sql.expression import func, literal from airflow.models.base import ID_LEN, Base from airflow.models.dag import DAG, DagModel @@ -117,14 +116,20 @@ def write_dag(cls, dag: DAG, min_update_interval: Optional[int] = None, session: # If Yes, does nothing # If No or the DAG does not exists, updates / writes Serialized DAG to DB if min_update_interval is not None: - if session.query( - exists().where( + if ( + session.query(literal(True)) + .filter( and_( cls.dag_id == dag.dag_id, (timezone.utcnow() - timedelta(seconds=min_update_interval)) < cls.last_updated, ) ) - ).scalar(): + .first() + is not None + ): + # TODO: .first() is not None can be changed to .scalar() once we update to sqlalchemy 1.4+ + # as the associated sqlalchemy bug for MySQL was fixed + # related issue : https://github.com/sqlalchemy/sqlalchemy/issues/5481 return False log.debug("Checking if DAG (%s) changed", dag.dag_id) @@ -217,7 +222,7 @@ def has_dag(cls, dag_id: str, session: Session = None) -> bool: :param dag_id: the DAG to check :param session: ORM Session """ - return session.query(exists().where(cls.dag_id == dag_id)).scalar() + return session.query(literal(True)).filter(cls.dag_id == dag_id).first() is not None @classmethod @provide_session @@ -313,7 +318,9 @@ def get_dag_dependencies(cls, session: Session = None) -> Dict[str, List['DagDep if session.bind.dialect.name in ["sqlite", "mysql"]: for row in session.query(cls.dag_id, func.json_extract(cls.data, "$.dag.dag_dependencies")).all(): dependencies[row[0]] = [DagDependency(**d) for d in json.loads(row[1])] - + elif session.bind.dialect.name == "mssql": + for row in session.query(cls.dag_id, func.json_query(cls.data, "$.dag.dag_dependencies")).all(): + dependencies[row[0]] = [DagDependency(**d) for d in json.loads(row[1])] else: for row in session.query( cls.dag_id, func.json_extract_path(cls.data, "dag", "dag_dependencies") diff --git a/airflow/sensors/smart_sensor.py b/airflow/sensors/smart_sensor.py index 6c17beb8912ad..9d0a28c65ae00 100644 --- a/airflow/sensors/smart_sensor.py +++ b/airflow/sensors/smart_sensor.py @@ -391,23 +391,34 @@ def _update_ti_hostname(self, sensor_works, session=None): :param session: The sqlalchemy session. """ TI = TaskInstance - ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works] - def update_ti_hostname_with_count(count, ti_keys): + def update_ti_hostname_with_count(count, sensor_works): # Using or_ instead of in_ here to prevent from full table scan. - tis = ( - session.query(TI) - .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key for ti_key in ti_keys)) - .all() - ) + if session.bind.dialect.name == 'mssql': + ti_filter = or_( + and_( + TI.dag_id == ti_key.dag_id, + TI.task_id == ti_key.task_id, + TI.execution_date == ti_key.execution_date, + ) + for ti_key in sensor_works + ) + else: + ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in sensor_works] + ti_filter = or_( + tuple_(TI.dag_id, TI.task_id, TI.execution_date) == ti_key for ti_key in ti_keys + ) + tis = session.query(TI).filter(ti_filter).all() for ti in tis: ti.hostname = self.hostname session.commit() - return count + len(ti_keys) + return count + len(sensor_works) - count = helpers.reduce_in_chunks(update_ti_hostname_with_count, ti_keys, 0, self.max_tis_per_query) + count = helpers.reduce_in_chunks( + update_ti_hostname_with_count, sensor_works, 0, self.max_tis_per_query + ) if count: self.log.info("Updated hostname on %s tis.", count) diff --git a/airflow/www/security.py b/airflow/www/security.py index 8768587fdb769..9b391f79cc840 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -378,6 +378,14 @@ def is_dag_resource(self, resource_name): return True return resource_name.startswith(permissions.RESOURCE_DAG_PREFIX) + def _has_view_access(self, user, action, resource) -> bool: + """ + Overriding the method to ensure that it always returns a bool + _has_view_access can return NoneType which gives us + issues later on, this fixes that. + """ + return bool(super()._has_view_access(user, action, resource)) + def has_access(self, permission, resource, user=None) -> bool: """ Verify whether a given user could perform certain permission diff --git a/airflow/www/views.py b/airflow/www/views.py index 78dca2f010eb6..e90ec6bc821b3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -76,7 +76,7 @@ from pendulum.datetime import DateTime from pygments import highlight, lexers from pygments.formatters import HtmlFormatter # noqa pylint: disable=no-name-in-module -from sqlalchemy import and_, desc, func, or_, union_all +from sqlalchemy import Date, and_, desc, func, or_, union_all from sqlalchemy.orm import joinedload from wtforms import SelectField, validators from wtforms.validators import InputRequired @@ -2090,6 +2090,14 @@ def tree(self): @action_logging # pylint: disable=too-many-locals def calendar(self): """Get DAG runs as calendar""" + + def _convert_to_date(session, column): + """Convert column to date.""" + if session.bind.dialect.name == 'mssql': + return column.cast(Date) + else: + return func.date(column) + dag_id = request.args.get('dag_id') dag = current_app.dag_bag.get_dag(dag_id) if not dag: @@ -2103,13 +2111,13 @@ def calendar(self): with create_session() as session: dag_states = ( session.query( - func.date(DagRun.execution_date).label('date'), + (_convert_to_date(session, DagRun.execution_date)).label('date'), DagRun.state, func.count('*').label('count'), ) .filter(DagRun.dag_id == dag.dag_id) - .group_by(func.date(DagRun.execution_date), DagRun.state) - .order_by(func.date(DagRun.execution_date).asc()) + .group_by(_convert_to_date(session, DagRun.execution_date), DagRun.state) + .order_by(_convert_to_date(session, DagRun.execution_date).asc()) .all() ) diff --git a/breeze b/breeze index 50f9e5510aba1..a838d65036109 100755 --- a/breeze +++ b/breeze @@ -3069,6 +3069,9 @@ function breeze::read_saved_environment_variables() { MYSQL_VERSION="${MYSQL_VERSION:=$(parameters::read_from_file MYSQL_VERSION)}" MYSQL_VERSION=${MYSQL_VERSION:=${_breeze_default_mysql_version}} + MSSQL_VERSION="${MSSQL_VERSION:=$(parameters::read_from_file MSSQL_VERSION)}" + MSSQL_VERSION=${MSSQL_VERSION:=${_breeze_default_mssql_version}} + # Here you read DockerHub user/account that you use # You can populate your own images in DockerHub this way and work with the, # You can override it with "--dockerhub-user" option and it will be stored in .build directory @@ -3102,6 +3105,7 @@ function breeze::read_saved_environment_variables() { # EXECUTOR # POSTGRES_VERSION # MYSQL_VERSION +# MSSQL_VERSION # DOCKERHUB_USER # DOCKERHUB_REPO # @@ -3135,6 +3139,7 @@ function breeze::check_and_save_all_params() { parameters::check_and_save_allowed_param "EXECUTOR" "Executors" "--executor" parameters::check_and_save_allowed_param "POSTGRES_VERSION" "Postgres version" "--postgres-version" parameters::check_and_save_allowed_param "MYSQL_VERSION" "Mysql version" "--mysql-version" + parameters::check_and_save_allowed_param "MSSQL_VERSION" "MSSql version" "--mssql-version" parameters::check_and_save_allowed_param "GITHUB_REGISTRY" "GitHub Registry" "--github-registry" parameters::check_allowed_param TEST_TYPE "Type of tests" "--test-type" @@ -3156,6 +3161,7 @@ function breeze::check_and_save_all_params() { # WEBSERVER_HOST_PORT # POSTGRES_HOST_PORT # MYSQL_HOST_PORT +# MSSQL_HOST_PORT # ####################################################################################################### function breeze::print_cheatsheet() { @@ -3188,6 +3194,7 @@ function breeze::print_cheatsheet() { echo " * ${FLOWER_HOST_PORT} -> forwarded to Flower dashboard -> airflow:5555" echo " * ${POSTGRES_HOST_PORT} -> forwarded to Postgres database -> postgres:5432" echo " * ${MYSQL_HOST_PORT} -> forwarded to MySQL database -> mysql:3306" + echo " * ${MSSQL_HOST_PORT} -> forwarded to MSSQL database -> mssql:1443" echo " * ${REDIS_HOST_PORT} -> forwarded to Redis broker -> redis:6379" echo echo " Here are links to those services that you can use on host:" diff --git a/breeze-complete b/breeze-complete index 6126ca4228c33..b0985eb95c0e1 100644 --- a/breeze-complete +++ b/breeze-complete @@ -24,7 +24,7 @@ # Those cannot be made read-only as the breeze-complete must be re-sourceable _breeze_allowed_python_major_minor_versions="3.6 3.7 3.8" -_breeze_allowed_backends="sqlite mysql postgres" +_breeze_allowed_backends="sqlite mysql postgres mssql" _breeze_allowed_integrations="cassandra kerberos mongo openldap pinot rabbitmq redis statsd trino all" _breeze_allowed_generate_constraints_modes="source-providers pypi-providers no-providers" # registrys is good here even if it is not correct english. We are adding s automatically to all variables @@ -34,6 +34,7 @@ _breeze_allowed_kubernetes_versions="v1.20.2 v1.19.7 v1.18.15" _breeze_allowed_helm_versions="v3.2.4" _breeze_allowed_kind_versions="v0.10.0" _breeze_allowed_mysql_versions="5.7 8" +_breeze_allowed_mssql_versions="2017-latest 2019-latest" _breeze_allowed_postgres_versions="9.6 10 11 12 13" _breeze_allowed_kind_operations="start stop restart status deploy test shell k9s" _breeze_allowed_executors="KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor" @@ -54,6 +55,7 @@ _breeze_allowed_installation_methods=". apache-airflow" _breeze_default_executor=$(echo "${_breeze_allowed_executors}" | awk '{print $1}') _breeze_default_postgres_version=$(echo "${_breeze_allowed_postgres_versions}" | awk '{print $1}') _breeze_default_mysql_version=$(echo "${_breeze_allowed_mysql_versions}" | awk '{print $1}') + _breeze_default_mssql_version=$(echo "${_breeze_allowed_mssql_versions}" | awk '{print $1}') _breeze_default_test_type=$(echo "${_breeze_allowed_test_types}" | awk '{print $1}') _breeze_default_package_format=$(echo "${_breeze_allowed_package_formats}" | awk '{print $1}') } @@ -175,7 +177,7 @@ verbose assume-yes assume-no assume-quit forward-credentials init-script: force-build-images force-pull-images force-pull-base-python-image production-image extras: force-clean-images skip-rebuild-check build-cache-local build-cache-pulled build-cache-disabled disable-pip-cache dockerhub-user: dockerhub-repo: use-github-registry github-registry: github-repository: github-image-id: generate-constraints-mode: -postgres-version: mysql-version: +postgres-version: mysql-version: mssql-version: version-suffix-for-pypi: version-suffix-for-svn: additional-extras: additional-python-deps: additional-dev-deps: additional-runtime-deps: image-tag: disable-mysql-client-installation constraints-location: disable-pip-cache install-from-docker-context-files @@ -287,6 +289,9 @@ function breeze_complete::get_known_values_breeze() { --mysql-version) _breeze_known_values=${_breeze_allowed_mysql_versions} ;; + --mssql-version) + _breeze_known_values=${_breeze_allowed_mssql_versions} + ;; -D | --dockerhub-user) _breeze_known_values="${_breeze_default_dockerhub_user}" ;; diff --git a/docs/apache-airflow/installation.rst b/docs/apache-airflow/installation.rst index e8fb3e7bdc349..ace814e0a2368 100644 --- a/docs/apache-airflow/installation.rst +++ b/docs/apache-airflow/installation.rst @@ -65,6 +65,7 @@ Airflow is tested with: * PostgreSQL: 9.6, 10, 11, 12, 13 * MySQL: 5.7, 8 * SQLite: 3.15.0+ + * MSSQL(Experimental): 2017, 2019 * Kubernetes: 1.18.15 1.19.7 1.20.2 diff --git a/scripts/ci/docker-compose/backend-mssql-port.yml b/scripts/ci/docker-compose/backend-mssql-port.yml new file mode 100644 index 0000000000000..42fd4185da3b6 --- /dev/null +++ b/scripts/ci/docker-compose/backend-mssql-port.yml @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +version: "2.2" +services: + mssql: + ports: + - "${MSSQL_HOST_PORT}:1433" diff --git a/scripts/ci/docker-compose/backend-mssql.yml b/scripts/ci/docker-compose/backend-mssql.yml new file mode 100644 index 0000000000000..93d2da179548b --- /dev/null +++ b/scripts/ci/docker-compose/backend-mssql.yml @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +version: "2.2" +services: + airflow: + environment: + - BACKEND=mssql + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mssql+pyodbc://sa:Airflow123@mssql:1433/master?driver=ODBC+Driver+17+for+SQL+Server + - AIRFLOW__CELERY__RESULT_BACKEND=db+mssql+pyodbc://sa:Airflow123@mssql:1433/master?driver=ODBC+Driver+17+for+SQL+Server + depends_on: + - mssql + mssql: + image: mcr.microsoft.com/mssql/server:${MSSQL_VERSION} + environment: + - ACCEPT_EULA=Y + - SA_PASSWORD=Airflow123 + volumes: + - mssql-db-volume:/var/opt/mssql diff --git a/scripts/ci/docker-compose/base.yml b/scripts/ci/docker-compose/base.yml index eab6425f5513a..a3d184282cd0b 100644 --- a/scripts/ci/docker-compose/base.yml +++ b/scripts/ci/docker-compose/base.yml @@ -38,3 +38,4 @@ volumes: sqlite-db-volume: postgres-db-volume: mysql-db-volume: + mssql-db-volume: diff --git a/scripts/ci/libraries/_initialization.sh b/scripts/ci/libraries/_initialization.sh index c734ccd14d088..e824963a0c8f4 100644 --- a/scripts/ci/libraries/_initialization.sh +++ b/scripts/ci/libraries/_initialization.sh @@ -23,6 +23,7 @@ CURRENT_KUBERNETES_VERSIONS=() CURRENT_KUBERNETES_MODES=() CURRENT_POSTGRES_VERSIONS=() CURRENT_MYSQL_VERSIONS=() +CURRENT_MSSQL_VERSIONS=() CURRENT_KIND_VERSIONS=() CURRENT_HELM_VERSIONS=() CURRENT_EXECUTOR=() @@ -74,6 +75,7 @@ function initialization::initialize_base_variables() { export WEBSERVER_HOST_PORT=${WEBSERVER_HOST_PORT:="28080"} export POSTGRES_HOST_PORT=${POSTGRES_HOST_PORT:="25433"} export MYSQL_HOST_PORT=${MYSQL_HOST_PORT:="23306"} + export MSSQL_HOST_PORT=${MSSQL_HOST_PORT:="21433"} export FLOWER_HOST_PORT=${FLOWER_HOST_PORT:="25555"} export REDIS_HOST_PORT=${REDIS_HOST_PORT:="26379"} @@ -103,6 +105,10 @@ function initialization::initialize_base_variables() { CURRENT_MYSQL_VERSIONS+=("5.7" "8") export CURRENT_MYSQL_VERSIONS + # Currently supported versions of MSSQL + CURRENT_MSSQL_VERSIONS+=("2017-latest" "2019-latest") + export CURRENT_MSSQL_VERSIONS + BACKEND=${BACKEND:="sqlite"} export BACKEND @@ -112,6 +118,9 @@ function initialization::initialize_base_variables() { # Default MySQL versions export MYSQL_VERSION=${MYSQL_VERSION:=${CURRENT_MYSQL_VERSIONS[0]}} + #Default MS SQL version + export MSSQL_VERSION=${MSSQL_VERSION:=${CURRENT_MSSQL_VERSIONS[0]}} + # If set to true, the database will be reset at entry. Works for Postgres and MySQL export DB_RESET=${DB_RESET:="false"} @@ -897,6 +906,7 @@ function initialization::make_constants_read_only() { readonly CURRENT_KUBERNETES_MODES readonly CURRENT_POSTGRES_VERSIONS readonly CURRENT_MYSQL_VERSIONS + readonly CURRENT_MSSQL_VERSIONS readonly CURRENT_KIND_VERSIONS readonly CURRENT_HELM_VERSIONS readonly CURRENT_EXECUTOR diff --git a/scripts/ci/selective_ci_checks.sh b/scripts/ci/selective_ci_checks.sh index 7e77eddec8c7c..9a115a2f768a5 100755 --- a/scripts/ci/selective_ci_checks.sh +++ b/scripts/ci/selective_ci_checks.sh @@ -103,9 +103,19 @@ function output_all_basic_variables() { initialization::ga_output mysql-versions \ "$(initialization::parameters_to_json "${MYSQL_VERSION}")" fi - initialization::ga_output default-mysql-version "${MYSQL_VERSION}" + if [[ ${FULL_TESTS_NEEDED_LABEL} == "true" ]]; then + initialization::ga_output mssql-versions \ + "$(initialization::parameters_to_json "${CURRENT_MSSQL_VERSIONS[@]}")" + else + initialization::ga_output mssql-versions \ + "$(initialization::parameters_to_json "${MSSQL_VERSION}")" + fi + initialization::ga_output default-mssql-version "${MSSQL_VERSION}" + + + initialization::ga_output kind-versions \ "$(initialization::parameters_to_json "${CURRENT_KIND_VERSIONS[@]}")" initialization::ga_output default-kind-version "${KIND_VERSION}" @@ -117,10 +127,12 @@ function output_all_basic_variables() { if [[ ${FULL_TESTS_NEEDED_LABEL} == "true" ]]; then initialization::ga_output postgres-exclude '[{ "python-version": "3.6" }]' initialization::ga_output mysql-exclude '[{ "python-version": "3.7" }]' + initialization::ga_output mssql-exclude '[{ "python-version": "3.7" }]' initialization::ga_output sqlite-exclude '[{ "python-version": "3.8" }]' else initialization::ga_output postgres-exclude '[]' initialization::ga_output mysql-exclude '[]' + initialization::ga_output mssql-exclude '[]' initialization::ga_output sqlite-exclude '[]' fi diff --git a/scripts/ci/testing/ci_run_airflow_testing.sh b/scripts/ci/testing/ci_run_airflow_testing.sh index fa8c0448267e3..9368b873087e1 100755 --- a/scripts/ci/testing/ci_run_airflow_testing.sh +++ b/scripts/ci/testing/ci_run_airflow_testing.sh @@ -96,6 +96,12 @@ function run_all_test_types_in_parallel() { # Remove Integration from list of tests to run in parallel test_types_to_run="${test_types_to_run//Integration/}" run_integration_tests_separately="true" + if [[ ${BACKEND} == "mssql" ]]; then + # Skip running "Integration" tests for low memory condition for mssql + run_integration_tests_separately="false" + else + run_integration_tests_separately="true" + fi fi fi set +e diff --git a/scripts/in_container/check_environment.sh b/scripts/in_container/check_environment.sh index 22c6fe58d2092..3237bc9db1935 100755 --- a/scripts/in_container/check_environment.sh +++ b/scripts/in_container/check_environment.sh @@ -98,6 +98,8 @@ function check_db_backend { check_service "PostgreSQL" "run_nc postgres 5432" "${MAX_CHECK}" elif [[ ${BACKEND} == "mysql" ]]; then check_service "MySQL" "run_nc mysql 3306" "${MAX_CHECK}" + elif [[ ${BACKEND} == "mssql" ]]; then + check_service "MSSQL" "run_nc mssql 1433" "${MAX_CHECK}" elif [[ ${BACKEND} == "sqlite" ]]; then return else diff --git a/tests/bats/breeze/test_breeze_complete.bats b/tests/bats/breeze/test_breeze_complete.bats index c1dfed1659c43..249a493f80095 100644 --- a/tests/bats/breeze/test_breeze_complete.bats +++ b/tests/bats/breeze/test_breeze_complete.bats @@ -239,6 +239,22 @@ assert_equal "${_breeze_default_mysql_version}" "${MYSQL_VERSION}" } +@test "Test allowed MSSQL versions same as CURRENT" { + load ../bats_utils + #shellcheck source=breeze-complete + source "${AIRFLOW_SOURCES}/breeze-complete" + + assert_equal "${_breeze_allowed_mssql_versions}" "${CURRENT_MSSQL_VERSIONS[*]}" +} + +@test "Test default MSSQL version same as MSSQL_VERSION" { + load ../bats_utils + #shellcheck source=breeze-complete + source "${AIRFLOW_SOURCES}/breeze-complete" + + assert_equal "${_breeze_default_mssql_version}" "${MSSQL_VERSION}" +} + @test "Test allowed Postgres versions same as CURRENT" { load ../bats_utils #shellcheck source=breeze-complete diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index d83f542f875b9..bd88d5dfa1468 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -172,7 +172,14 @@ def test_delete_old_records(self, rtif_num, num_to_keep, remaining_rtifs, expect assert rtif_num == len(result) # Verify old records are deleted and only 'num_to_keep' records are kept - with assert_queries_count(expected_query_count): + # For other DBs,an extra query is fired in RenderedTaskInstanceFields.delete_old_records + expected_query_count_based_on_db = ( + expected_query_count + 1 + if session.bind.dialect.name == "mssql" and expected_query_count != 0 + else expected_query_count + ) + + with assert_queries_count(expected_query_count_based_on_db): RTIF.delete_old_records(task_id=task.task_id, dag_id=task.dag_id, num_to_keep=num_to_keep) result = session.query(RTIF).filter(RTIF.dag_id == dag.dag_id, RTIF.task_id == task.task_id).all() assert remaining_rtifs == len(result) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 35c00fe8368cf..9c2beb6cce87f 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2098,8 +2098,15 @@ def test_execute_queries_count(self, expected_query_count, mark_success): run_type=DagRunType.SCHEDULED, session=session, ) + # an extra query is fired in RenderedTaskInstanceFields.delete_old_records + # for other DBs. delete_old_records is called only when mark_success is False + expected_query_count_based_on_db = ( + expected_query_count + 1 + if session.bind.dialect.name == "mssql" and expected_query_count > 0 and not mark_success + else expected_query_count + ) - with assert_queries_count(expected_query_count): + with assert_queries_count(expected_query_count_based_on_db): ti._run_raw_task(mark_success=mark_success) def test_execute_queries_count_store_serialized(self): @@ -2116,8 +2123,11 @@ def test_execute_queries_count_store_serialized(self): run_type=DagRunType.SCHEDULED, session=session, ) + # an extra query is fired in RenderedTaskInstanceFields.delete_old_records + # for other DBs + expected_query_count_based_on_db = 13 if session.bind.dialect.name == "mssql" else 12 - with assert_queries_count(12): + with assert_queries_count(expected_query_count_based_on_db): ti._run_raw_task() def test_operator_field_with_serialization(self): diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py index e608ff1734c73..601dc6f9fe9da 100644 --- a/tests/utils/test_db.py +++ b/tests/utils/test_db.py @@ -68,6 +68,12 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self): lambda t: (t[0] == 'remove_index' and t[1].name == 'permission_view_id'), # from test_security unit test lambda t: (t[0] == 'remove_table' and t[1].name == 'some_model'), + # MSSQL default tables + lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_monitor'), + lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_db'), + lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_usg'), + lambda t: (t[0] == 'remove_table' and t[1].name == 'MSreplication_options'), + lambda t: (t[0] == 'remove_table' and t[1].name == 'spt_fallback_dev'), ] for ignore in ignores: diff = [d for d in diff if not ignore(d)]