From 63f1ee982b98dccb0c5e7e86cecba02bf70c389f Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Jun 2022 21:34:18 -0700 Subject: [PATCH 01/21] Add Dataset model --- .../versions/0114_2_4_0_add_dataset_model.py | 60 +++++++++++++++++++ airflow/models/dataset.py | 50 ++++++++++++++++ docs/apache-airflow/migrations-ref.rst | 4 +- 3 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 airflow/migrations/versions/0114_2_4_0_add_dataset_model.py create mode 100644 airflow/models/dataset.py diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py new file mode 100644 index 0000000000000..647fb9b9c959e --- /dev/null +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add Dataset model + +Revision ID: 0038cd0c28b4 +Revises: 44b7034f6bdc +Create Date: 2022-06-22 14:37:20.880672 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +from sqlalchemy import Integer, func + +from airflow.migrations.db_types import TIMESTAMP, StringID +from airflow.utils.sqlalchemy import ExtendedJSON + +revision = '0038cd0c28b4' +down_revision = '44b7034f6bdc' +branch_labels = None +depends_on = None +airflow_version = '2.4.0' + + +def upgrade(): + """Apply Add Dataset model""" + op.create_table( + 'dataset', + sa.Column('id', Integer, primary_key=True, autoincrement=True), + sa.Column('uri', StringID(length=500)), + sa.Column('extra', ExtendedJSON), + sa.Column('message', sa.String(1000), nullable=False), + sa.Column('created_at', TIMESTAMP, default=func.now(), nullable=False), + sa.Column('updated_at', TIMESTAMP, default=func.now(), nullable=False), + sqlite_autoincrement=True, + ) + op.create_index('idx_uri', 'dataset', ['uri'], unique=True) + + +def downgrade(): + """Unapply Add DagWarning model""" + op.drop_table('dataset') diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py new file mode 100644 index 0000000000000..076796d571f22 --- /dev/null +++ b/airflow/models/dataset.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column, Index, Integer + +from airflow.models.base import Base, StringID +from airflow.utils import timezone +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime + + +class Dataset(Base): + """A table to store datasets.""" + + id = Column(Integer, primary_key=True, autoincrement=True) + uri = Column(StringID(length=500), nullable=False) + extra = Column(ExtendedJSON, nullable=True) + created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False) + updated_at = Column(UtcDateTime, default=timezone.utcnow(), onupdate=timezone.utcnow(), nullable=False) + + __tablename__ = "dataset" + __table_args__ = ( + Index('idx_uri', uri, unique=True), + {'sqlite_autoincrement': True}, + ) + + def __init__(self, uri, extra, **kwargs): + super().__init__(**kwargs) + self.uri = uri + self.extra = extra + + def __eq__(self, other): + return self.uri == other.uri + + def __hash__(self): + return hash((self.uri, self.extra)) diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index b45e6c2de79cd..e5a5b13fa6c96 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``44b7034f6bdc`` (head) | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB | +| ``0038cd0c28b4`` (head) | ``44b7034f6bdc`` | ``2.4.0`` | Add Dataset model | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``44b7034f6bdc`` | ``424117c37d18`` | ``2.4.0`` | compare types between ORM and DB | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``424117c37d18`` | ``f5fcbda3e651`` | ``2.4.0`` | Add DagWarning model | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ From 9971987aeac63f251f1b38f55019025937a41ac7 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Jun 2022 21:44:32 -0700 Subject: [PATCH 02/21] forbid airflow scheme --- airflow/models/dataset.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 076796d571f22..4297f82183b00 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -15,6 +15,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import Dict, Optional +from urllib.parse import urlparse from sqlalchemy import Column, Index, Integer @@ -38,8 +40,11 @@ class Dataset(Base): {'sqlite_autoincrement': True}, ) - def __init__(self, uri, extra, **kwargs): + def __init__(self, uri, extra: Optional[Dict] = None, **kwargs): super().__init__(**kwargs) + parsed = urlparse(uri) + if parsed.scheme and parsed.scheme.lower() == 'airflow': + raise ValueError("Scheme `airflow` is reserved.") self.uri = uri self.extra = extra From 07b5f68c7b4efc72071a3f96fb5d4447d5207c82 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Jun 2022 21:49:55 -0700 Subject: [PATCH 03/21] Update airflow/models/dataset.py Co-authored-by: Tzu-ping Chung --- airflow/models/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 4297f82183b00..522d45b40d426 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -44,7 +44,7 @@ def __init__(self, uri, extra: Optional[Dict] = None, **kwargs): super().__init__(**kwargs) parsed = urlparse(uri) if parsed.scheme and parsed.scheme.lower() == 'airflow': - raise ValueError("Scheme `airflow` is reserved.") + raise ValueError("Scheme `airflow://` is reserved.") self.uri = uri self.extra = extra From 5f70b4323c65fb44072cfc3377d14880182a1bfb Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 22 Jun 2022 21:51:02 -0700 Subject: [PATCH 04/21] remove extraneous field --- airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 647fb9b9c959e..e0b13d1159dfc 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -47,7 +47,6 @@ def upgrade(): sa.Column('id', Integer, primary_key=True, autoincrement=True), sa.Column('uri', StringID(length=500)), sa.Column('extra', ExtendedJSON), - sa.Column('message', sa.String(1000), nullable=False), sa.Column('created_at', TIMESTAMP, default=func.now(), nullable=False), sa.Column('updated_at', TIMESTAMP, default=func.now(), nullable=False), sqlite_autoincrement=True, From d3354654e933a5c6500b58916c0ac3d0467ffeb3 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 23 Jun 2022 09:46:28 -0700 Subject: [PATCH 05/21] Update airflow/models/dataset.py Co-authored-by: Ash Berlin-Taylor --- airflow/models/dataset.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 522d45b40d426..1bbc60ba74a3d 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -40,13 +40,11 @@ class Dataset(Base): {'sqlite_autoincrement': True}, ) - def __init__(self, uri, extra: Optional[Dict] = None, **kwargs): - super().__init__(**kwargs) + def __init__(self, uri: str, extra: Optional[Dict] = None, **kwargs): parsed = urlparse(uri) if parsed.scheme and parsed.scheme.lower() == 'airflow': raise ValueError("Scheme `airflow://` is reserved.") - self.uri = uri - self.extra = extra + super().__init__(uri=uri, extra=extra, **kwargs) def __eq__(self, other): return self.uri == other.uri From c2dc83e4abee2c205a8b94f396ae9fd8b534e398 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 23 Jun 2022 09:46:38 -0700 Subject: [PATCH 06/21] Update airflow/migrations/versions/0113_2_4_0_add_dataset_model.py Co-authored-by: Ash Berlin-Taylor --- airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index e0b13d1159dfc..99048213dc696 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -55,5 +55,5 @@ def upgrade(): def downgrade(): - """Unapply Add DagWarning model""" + """Unapply Add Dataset model""" op.drop_table('dataset') From 47255db070ed262405590f5ded234fa504d472dd Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 23 Jun 2022 10:01:02 -0700 Subject: [PATCH 07/21] Update airflow/models/dataset.py --- airflow/models/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 1bbc60ba74a3d..6ce5cab13e408 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -43,7 +43,7 @@ class Dataset(Base): def __init__(self, uri: str, extra: Optional[Dict] = None, **kwargs): parsed = urlparse(uri) if parsed.scheme and parsed.scheme.lower() == 'airflow': - raise ValueError("Scheme `airflow://` is reserved.") + raise ValueError("Scheme `airflow` is reserved.") super().__init__(uri=uri, extra=extra, **kwargs) def __eq__(self, other): From 5f2045b2a86daa6190105a1f198ffd0f32156223 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 23 Jun 2022 11:08:56 -0700 Subject: [PATCH 08/21] add imports --- airflow/__init__.py | 1 + airflow/models/__init__.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/airflow/__init__.py b/airflow/__init__.py index cbbb03dd1be29..826ffa80f2640 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -57,6 +57,7 @@ # Things to lazy import in form 'name': 'path.to.module' __lazy_imports = { 'DAG': 'airflow.models.dag', + 'Dataset': 'airflow.models.dataset', 'XComArg': 'airflow.models.xcom_arg', 'AirflowException': 'airflow.exceptions', } diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 4cb19eecdeda2..ecf14576eef6d 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -26,6 +26,7 @@ from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning +from airflow.models.dataset import Dataset from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ImportError from airflow.models.log import Log @@ -58,6 +59,7 @@ "DagPickle", "DagRun", "DagTag", + "Dataset", "DbCallbackRequest", "ImportError", "Log", From f893dd061236747f47dd2e81b9eb6359f3a29b77 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 23 Jun 2022 21:00:26 -0700 Subject: [PATCH 09/21] kaxil updates --- .../versions/0114_2_4_0_add_dataset_model.py | 4 ++-- airflow/models/dataset.py | 14 +++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 99048213dc696..3b61cbff5e292 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -45,11 +45,11 @@ def upgrade(): op.create_table( 'dataset', sa.Column('id', Integer, primary_key=True, autoincrement=True), - sa.Column('uri', StringID(length=500)), + sa.Column('uri', StringID(length=1000)), sa.Column('extra', ExtendedJSON), sa.Column('created_at', TIMESTAMP, default=func.now(), nullable=False), sa.Column('updated_at', TIMESTAMP, default=func.now(), nullable=False), - sqlite_autoincrement=True, + sqlite_autoincrement=True, # ensures PK values not reused ) op.create_index('idx_uri', 'dataset', ['uri'], unique=True) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 6ce5cab13e408..22015dbee0f4e 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -26,10 +26,15 @@ class Dataset(Base): - """A table to store datasets.""" + """ + A table to store datasets. + + :param uri: a string that uniquely identifies the dataset + :param extra: JSON field for arbitrary extra info + """ id = Column(Integer, primary_key=True, autoincrement=True) - uri = Column(StringID(length=500), nullable=False) + uri = Column(StringID(length=1000), nullable=False) extra = Column(ExtendedJSON, nullable=True) created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow(), onupdate=timezone.utcnow(), nullable=False) @@ -37,7 +42,7 @@ class Dataset(Base): __tablename__ = "dataset" __table_args__ = ( Index('idx_uri', uri, unique=True), - {'sqlite_autoincrement': True}, + {'sqlite_autoincrement': True}, # ensures PK values not reused ) def __init__(self, uri: str, extra: Optional[Dict] = None, **kwargs): @@ -51,3 +56,6 @@ def __eq__(self, other): def __hash__(self): return hash((self.uri, self.extra)) + + def __repr__(self): + return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})" From 93449d7b54cb4b85f3911ec9eca79124f87069c4 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 23 Jun 2022 21:24:49 -0700 Subject: [PATCH 10/21] small --- airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 3b61cbff5e292..de21b9ebe3737 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -26,8 +26,6 @@ import sqlalchemy as sa from alembic import op - -# revision identifiers, used by Alembic. from sqlalchemy import Integer, func from airflow.migrations.db_types import TIMESTAMP, StringID From bf2d76654c9f27d8079fc95db5febefefdc0c055 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 24 Jun 2022 08:10:34 -0700 Subject: [PATCH 11/21] use ascii for mysql --- .../versions/0114_2_4_0_add_dataset_model.py | 12 ++++++++---- airflow/models/dataset.py | 13 ++++++++++--- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index de21b9ebe3737..5e63af1515ed1 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -26,9 +26,9 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy import Integer, func +from sqlalchemy import Integer, String, func -from airflow.migrations.db_types import TIMESTAMP, StringID +from airflow.migrations.db_types import TIMESTAMP from airflow.utils.sqlalchemy import ExtendedJSON revision = '0038cd0c28b4' @@ -43,8 +43,12 @@ def upgrade(): op.create_table( 'dataset', sa.Column('id', Integer, primary_key=True, autoincrement=True), - sa.Column('uri', StringID(length=1000)), - sa.Column('extra', ExtendedJSON), + sa.Column( + 'uri', + String(length=3000).with_variant(String(length=3000, collation='ascii_general_ci'), 'mysql'), + nullable=False, + ), + sa.Column('extra', ExtendedJSON, nullable=True), sa.Column('created_at', TIMESTAMP, default=func.now(), nullable=False), sa.Column('updated_at', TIMESTAMP, default=func.now(), nullable=False), sqlite_autoincrement=True, # ensures PK values not reused diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 22015dbee0f4e..81b59c023bb5d 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -18,9 +18,9 @@ from typing import Dict, Optional from urllib.parse import urlparse -from sqlalchemy import Column, Index, Integer +from sqlalchemy import Column, Index, Integer, String -from airflow.models.base import Base, StringID +from airflow.models.base import Base from airflow.utils import timezone from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime @@ -34,7 +34,10 @@ class Dataset(Base): """ id = Column(Integer, primary_key=True, autoincrement=True) - uri = Column(StringID(length=1000), nullable=False) + uri = Column( + String(length=3000).with_variant(String(length=3000, collation='ascii_general_ci'), 'mysql'), + nullable=False, + ) extra = Column(ExtendedJSON, nullable=True) created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow(), onupdate=timezone.utcnow(), nullable=False) @@ -46,6 +49,10 @@ class Dataset(Base): ) def __init__(self, uri: str, extra: Optional[Dict] = None, **kwargs): + try: + uri.encode('ascii') + except UnicodeEncodeError: + raise ValueError('URI must be ascii') parsed = urlparse(uri) if parsed.scheme and parsed.scheme.lower() == 'airflow': raise ValueError("Scheme `airflow` is reserved.") From 10f9f25f5ee19a2c2ed174c769f3a2d94e3353b2 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 24 Jun 2022 08:36:40 -0700 Subject: [PATCH 12/21] fix defaults --- airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 4 ++-- airflow/models/dataset.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 5e63af1515ed1..ab9e1aee60020 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -49,8 +49,8 @@ def upgrade(): nullable=False, ), sa.Column('extra', ExtendedJSON, nullable=True), - sa.Column('created_at', TIMESTAMP, default=func.now(), nullable=False), - sa.Column('updated_at', TIMESTAMP, default=func.now(), nullable=False), + sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False), + sa.Column('updated_at', TIMESTAMP, default=func.now, nullable=False), sqlite_autoincrement=True, # ensures PK values not reused ) op.create_index('idx_uri', 'dataset', ['uri'], unique=True) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 81b59c023bb5d..18908cb7e1481 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -39,8 +39,8 @@ class Dataset(Base): nullable=False, ) extra = Column(ExtendedJSON, nullable=True) - created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False) - updated_at = Column(UtcDateTime, default=timezone.utcnow(), onupdate=timezone.utcnow(), nullable=False) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) __tablename__ = "dataset" __table_args__ = ( From a9617e54c4a371f62ffdc760ce9c18100bcf393d Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 24 Jun 2022 14:34:31 -0700 Subject: [PATCH 13/21] use latin1_general_cs --- .../versions/0114_2_4_0_add_dataset_model.py | 10 +++++++++- airflow/models/dataset.py | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index ab9e1aee60020..3569efafa8d5d 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -45,7 +45,15 @@ def upgrade(): sa.Column('id', Integer, primary_key=True, autoincrement=True), sa.Column( 'uri', - String(length=3000).with_variant(String(length=3000, collation='ascii_general_ci'), 'mysql'), + String(length=3000).with_variant( + String( + length=3000, + # latin1 allows for more indexed length in mysql + # and this field should only be ascii chars + collation='latin1_general_cs', + ), + 'mysql', + ), nullable=False, ), sa.Column('extra', ExtendedJSON, nullable=True), diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 18908cb7e1481..bbf67b6687f29 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -35,7 +35,15 @@ class Dataset(Base): id = Column(Integer, primary_key=True, autoincrement=True) uri = Column( - String(length=3000).with_variant(String(length=3000, collation='ascii_general_ci'), 'mysql'), + String(length=3000).with_variant( + String( + length=3000, + # latin1 allows for more indexed length in mysql + # and this field should only be ascii chars + collation='latin1_general_cs', + ), + 'mysql', + ), nullable=False, ) extra = Column(ExtendedJSON, nullable=True) From 520e14673dc0900176f48694d873d9536494808c Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 24 Jun 2022 15:19:14 -0700 Subject: [PATCH 14/21] Update airflow/models/dataset.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- airflow/models/dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index bbf67b6687f29..58e9d83751853 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -56,7 +56,7 @@ class Dataset(Base): {'sqlite_autoincrement': True}, # ensures PK values not reused ) - def __init__(self, uri: str, extra: Optional[Dict] = None, **kwargs): + def __init__(self, uri: str, **kwargs): try: uri.encode('ascii') except UnicodeEncodeError: @@ -64,7 +64,7 @@ def __init__(self, uri: str, extra: Optional[Dict] = None, **kwargs): parsed = urlparse(uri) if parsed.scheme and parsed.scheme.lower() == 'airflow': raise ValueError("Scheme `airflow` is reserved.") - super().__init__(uri=uri, extra=extra, **kwargs) + super().__init__(uri=uri, **kwargs) def __eq__(self, other): return self.uri == other.uri From 080fc5ef9bbe569dce2d7bb14409324fa1dcd36e Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 24 Jun 2022 17:10:53 -0600 Subject: [PATCH 15/21] Update airflow/models/dataset.py --- airflow/models/dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 58e9d83751853..f06eac6676680 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -15,7 +15,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Dict, Optional from urllib.parse import urlparse from sqlalchemy import Column, Index, Integer, String From 57f221b9c45b5ad2f03cedd54874b288c6320903 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 24 Jun 2022 20:45:14 -0700 Subject: [PATCH 16/21] fix test --- tests/utils/test_db_cleanup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index 430e21215d811..5c0554591902d 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -252,8 +252,9 @@ def test_no_models_missing(self): all_models.update({class_.__tablename__: class_}) exclusion_list = { 'variable', # leave alone + 'dataset', # not good way to know if "stale" 'trigger', # self-maintaining - 'task_map', # TODO: add datetime column to TaskMap so we can include it here + 'task_map', # keys to TI, so no need 'serialized_dag', # handled through FK to Dag 'log_template', # not a significant source of data; age not indicative of staleness 'dag_tag', # not a significant source of data; age not indicative of staleness, From 579fb22d4dbf01a05ee0f5979b84991c638edffc Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sat, 25 Jun 2022 08:51:53 -0700 Subject: [PATCH 17/21] fix sqlite sequence table issue --- tests/utils/test_db.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py index 60511f242862c..b6fe56b2d7795 100644 --- a/tests/utils/test_db.py +++ b/tests/utils/test_db.py @@ -76,7 +76,10 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self): # Ignore flask-session table/index lambda t: (t[0] == 'remove_table' and t[1].name == 'session'), lambda t: (t[0] == 'remove_index' and t[1].name == 'session_id'), + # sqlite sequence is used for autoincrementing columns created with `sqlite_autoincrement` option + lambda t: (t[0] == 'remove_table' and t[1].name == 'sqlite_sequence'), ] + for ignore in ignores: diff = [d for d in diff if not ignore(d)] From 379e9b79642e5b42f9212468997005e0ed13efec Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sat, 25 Jun 2022 09:35:49 -0700 Subject: [PATCH 18/21] remove `extra` field for yagni reasons --- airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 2 -- airflow/models/dataset.py | 6 ++---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 3569efafa8d5d..06700d0515249 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -29,7 +29,6 @@ from sqlalchemy import Integer, String, func from airflow.migrations.db_types import TIMESTAMP -from airflow.utils.sqlalchemy import ExtendedJSON revision = '0038cd0c28b4' down_revision = '44b7034f6bdc' @@ -56,7 +55,6 @@ def upgrade(): ), nullable=False, ), - sa.Column('extra', ExtendedJSON, nullable=True), sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False), sa.Column('updated_at', TIMESTAMP, default=func.now, nullable=False), sqlite_autoincrement=True, # ensures PK values not reused diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index f06eac6676680..f59faf2e53ab3 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -21,7 +21,7 @@ from airflow.models.base import Base from airflow.utils import timezone -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.sqlalchemy import UtcDateTime class Dataset(Base): @@ -29,7 +29,6 @@ class Dataset(Base): A table to store datasets. :param uri: a string that uniquely identifies the dataset - :param extra: JSON field for arbitrary extra info """ id = Column(Integer, primary_key=True, autoincrement=True) @@ -45,7 +44,6 @@ class Dataset(Base): ), nullable=False, ) - extra = Column(ExtendedJSON, nullable=True) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) @@ -72,4 +70,4 @@ def __hash__(self): return hash((self.uri, self.extra)) def __repr__(self): - return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})" + return f"{self.__class__.__name__}(uri={self.uri!r})" From 7e6a15e9370aee1d4b4c295d9a4503aa303125dc Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Sun, 26 Jun 2022 08:20:44 -0700 Subject: [PATCH 19/21] Revert "remove `extra` field for yagni reasons" This reverts commit c954d8dd62338faaed9323bc5e9c0d959e92c78c. --- airflow/migrations/versions/0114_2_4_0_add_dataset_model.py | 2 ++ airflow/models/dataset.py | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 06700d0515249..3569efafa8d5d 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -29,6 +29,7 @@ from sqlalchemy import Integer, String, func from airflow.migrations.db_types import TIMESTAMP +from airflow.utils.sqlalchemy import ExtendedJSON revision = '0038cd0c28b4' down_revision = '44b7034f6bdc' @@ -55,6 +56,7 @@ def upgrade(): ), nullable=False, ), + sa.Column('extra', ExtendedJSON, nullable=True), sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False), sa.Column('updated_at', TIMESTAMP, default=func.now, nullable=False), sqlite_autoincrement=True, # ensures PK values not reused diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index f59faf2e53ab3..f06eac6676680 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -21,7 +21,7 @@ from airflow.models.base import Base from airflow.utils import timezone -from airflow.utils.sqlalchemy import UtcDateTime +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime class Dataset(Base): @@ -29,6 +29,7 @@ class Dataset(Base): A table to store datasets. :param uri: a string that uniquely identifies the dataset + :param extra: JSON field for arbitrary extra info """ id = Column(Integer, primary_key=True, autoincrement=True) @@ -44,6 +45,7 @@ class Dataset(Base): ), nullable=False, ) + extra = Column(ExtendedJSON, nullable=True) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) @@ -70,4 +72,4 @@ def __hash__(self): return hash((self.uri, self.extra)) def __repr__(self): - return f"{self.__class__.__name__}(uri={self.uri!r})" + return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})" From 474b4851f6c8bce2bfede65da33bcd1e69373ddf Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 27 Jun 2022 10:15:30 -0700 Subject: [PATCH 20/21] fix hash --- airflow/models/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index f06eac6676680..79d58c7a80a87 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -69,7 +69,7 @@ def __eq__(self, other): return self.uri == other.uri def __hash__(self): - return hash((self.uri, self.extra)) + return hash(self.uri) def __repr__(self): return f"{self.__class__.__name__}(uri={self.uri!r}, extra={self.extra!r})" From ba5f935b566271cfd3c2c971b6583eb692470115 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Mon, 27 Jun 2022 12:30:49 -0700 Subject: [PATCH 21/21] add unique suffix --- .../migrations/versions/0114_2_4_0_add_dataset_model.py | 8 ++++---- airflow/models/dataset.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py index 3569efafa8d5d..838f8780b0e70 100644 --- a/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py +++ b/airflow/migrations/versions/0114_2_4_0_add_dataset_model.py @@ -26,7 +26,7 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy import Integer, String, func +from sqlalchemy import Integer, String from airflow.migrations.db_types import TIMESTAMP from airflow.utils.sqlalchemy import ExtendedJSON @@ -57,11 +57,11 @@ def upgrade(): nullable=False, ), sa.Column('extra', ExtendedJSON, nullable=True), - sa.Column('created_at', TIMESTAMP, default=func.now, nullable=False), - sa.Column('updated_at', TIMESTAMP, default=func.now, nullable=False), + sa.Column('created_at', TIMESTAMP, nullable=False), + sa.Column('updated_at', TIMESTAMP, nullable=False), sqlite_autoincrement=True, # ensures PK values not reused ) - op.create_index('idx_uri', 'dataset', ['uri'], unique=True) + op.create_index('idx_uri_unique', 'dataset', ['uri'], unique=True) def downgrade(): diff --git a/airflow/models/dataset.py b/airflow/models/dataset.py index 79d58c7a80a87..49f5dfd1f66be 100644 --- a/airflow/models/dataset.py +++ b/airflow/models/dataset.py @@ -51,7 +51,7 @@ class Dataset(Base): __tablename__ = "dataset" __table_args__ = ( - Index('idx_uri', uri, unique=True), + Index('idx_uri_unique', uri, unique=True), {'sqlite_autoincrement': True}, # ensures PK values not reused )