From 120606e4d589f88e19b2e12b71f8f42451c73851 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Mon, 13 Oct 2025 17:40:20 +0300 Subject: [PATCH 1/5] [Looky-7769] fix: Add keep_existing=True to TransformMetaTable and enhance Redis Cluster support --- datapipe/meta/sql_meta.py | 1 + datapipe/store/redis.py | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index e1028858..e9c12ba9 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -451,6 +451,7 @@ def __init__( name, dbconn.sqla_metadata, *self.sql_schema, + keep_existing=True, ) if create_table: diff --git a/datapipe/store/redis.py b/datapipe/store/redis.py index 8fa90c03..82174193 100644 --- a/datapipe/store/redis.py +++ b/datapipe/store/redis.py @@ -3,7 +3,7 @@ import pandas as pd from redis.client import Redis -from redis.cluster import RedisCluster +from redis.cluster import RedisCluster, ClusterNode from sqlalchemy import Column from datapipe.store.database import MetaKey @@ -23,6 +23,14 @@ def _to_itertuples(df: DataDF, colnames): return list(df[colnames].itertuples(index=False, name=None)) +def _parse_cluster_hosts(hosts): + nodes = [] + for host in hosts.split(","): + host, port = host.split(":") + nodes.append(ClusterNode(host, port)) + return nodes + + class RedisStore(TableStore): caps = TableStoreCaps( supports_delete=True, @@ -38,12 +46,20 @@ def __init__( name: str, data_sql_schema: List[Column], cluster_mode: bool = False, + password: str = None, ) -> None: self.connection = connection if not cluster_mode: self.redis_connection: Union[Redis, RedisCluster] = Redis.from_url(connection, decode_responses=True) else: - self.redis_connection = RedisCluster.from_url(connection, decode_responses=True) + if "," in connection: + self.redis_connection = RedisCluster( + startup_nodes=_parse_cluster_hosts(connection.removeprefix("redis://")), + password=password, + decode_responses=True + ) + else: + self.redis_connection = RedisCluster.from_url(connection, decode_responses=True) self.name = name self.data_sql_schema = data_sql_schema From 6a7d0847a39fd984459c03db653acc42443d9733 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Mon, 13 Oct 2025 17:57:11 +0300 Subject: [PATCH 2/5] [Looky-7769] docs: Update CHANGELOG with fixes from looky-crunch branch --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e54938ad..05d7375c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # WIP * Fix for getting existing idx of empy meta-table +* Add `keep_existing=True` to `TransformMetaTable` to prevent table metadata conflicts +* Enhance `RedisStore` with multi-node cluster support and password authentication # 0.14.4 From 8a0cb2a67b76625608a9513bd4aba9cfc7d98027 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Mon, 13 Oct 2025 18:03:01 +0300 Subject: [PATCH 3/5] [Looky-7769] fix: Fix mypy type error for password parameter in RedisStore --- datapipe/store/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/store/redis.py b/datapipe/store/redis.py index 82174193..4bd89bf6 100644 --- a/datapipe/store/redis.py +++ b/datapipe/store/redis.py @@ -46,7 +46,7 @@ def __init__( name: str, data_sql_schema: List[Column], cluster_mode: bool = False, - password: str = None, + password: Optional[str] = None, ) -> None: self.connection = connection if not cluster_mode: From e29fc356c1f91005d60d66cfa793b8c39e1ea8ce Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Tue, 14 Oct 2025 15:29:09 +0400 Subject: [PATCH 4/5] Add TODO comment for future code removal Added a TODO comment to remove code in future release. --- datapipe/meta/sql_meta.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index e9c12ba9..c07185ee 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -451,6 +451,7 @@ def __init__( name, dbconn.sqla_metadata, *self.sql_schema, + # TODO remove in 0.15 release keep_existing=True, ) From 4fcb686f48ecd8b454050c53d214aa25616d78a7 Mon Sep 17 00:00:00 2001 From: Andrey Tatarinov Date: Tue, 14 Oct 2025 15:37:08 +0400 Subject: [PATCH 5/5] fix actions rules for from-the-fork PRs --- .github/workflows/lint.yaml | 9 +++++++++ .github/workflows/test.yaml | 9 +++++++++ .github/workflows/test_examples.yaml | 9 +++++++++ 3 files changed, 27 insertions(+) diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index cf74a3bb..0eedb89e 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -2,6 +2,15 @@ name: Run linters on: push: + branches: + - "master" + paths: + - ".github/workflows/test.yaml" + - "datapipe/**" + - "tests/**" + - "pyproject.toml" + + pull_request: branches: - "**" paths: diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index d97f85b7..57c496bd 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -2,6 +2,15 @@ name: Run tests on: push: + branches: + - "master" + paths: + - ".github/workflows/test.yaml" + - "datapipe/**" + - "tests/**" + - "pyproject.toml" + + pull_request: branches: - "**" paths: diff --git a/.github/workflows/test_examples.yaml b/.github/workflows/test_examples.yaml index 34aa9b37..6f3215d7 100644 --- a/.github/workflows/test_examples.yaml +++ b/.github/workflows/test_examples.yaml @@ -2,6 +2,15 @@ name: Test examples on: push: + branches: + - "master" + paths: + - ".github/workflows/test.yaml" + - "datapipe/**" + - "tests/**" + - "pyproject.toml" + + pull_request: branches: - "**" paths: