diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index cf74a3bb..0eedb89e 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -2,6 +2,15 @@ name: Run linters on: push: + branches: + - "master" + paths: + - ".github/workflows/test.yaml" + - "datapipe/**" + - "tests/**" + - "pyproject.toml" + + pull_request: branches: - "**" paths: diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index d97f85b7..57c496bd 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -2,6 +2,15 @@ name: Run tests on: push: + branches: + - "master" + paths: + - ".github/workflows/test.yaml" + - "datapipe/**" + - "tests/**" + - "pyproject.toml" + + pull_request: branches: - "**" paths: diff --git a/.github/workflows/test_examples.yaml b/.github/workflows/test_examples.yaml index 34aa9b37..6f3215d7 100644 --- a/.github/workflows/test_examples.yaml +++ b/.github/workflows/test_examples.yaml @@ -2,6 +2,15 @@ name: Test examples on: push: + branches: + - "master" + paths: + - ".github/workflows/test.yaml" + - "datapipe/**" + - "tests/**" + - "pyproject.toml" + + pull_request: branches: - "**" paths: diff --git a/CHANGELOG.md b/CHANGELOG.md index e54938ad..05d7375c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # WIP * Fix for getting existing idx of empy meta-table +* Add `keep_existing=True` to `TransformMetaTable` to prevent table metadata conflicts +* Enhance `RedisStore` with multi-node cluster support and password authentication # 0.14.4 diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index e1028858..c07185ee 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -451,6 +451,8 @@ def __init__( name, dbconn.sqla_metadata, *self.sql_schema, + # TODO remove in 0.15 release + keep_existing=True, ) if create_table: diff --git a/datapipe/store/redis.py b/datapipe/store/redis.py index 8fa90c03..4bd89bf6 100644 --- a/datapipe/store/redis.py +++ b/datapipe/store/redis.py @@ -3,7 +3,7 @@ import pandas as pd from redis.client import Redis -from redis.cluster import RedisCluster +from redis.cluster import RedisCluster, ClusterNode from sqlalchemy import Column from datapipe.store.database import MetaKey @@ -23,6 +23,14 @@ def _to_itertuples(df: DataDF, colnames): return list(df[colnames].itertuples(index=False, name=None)) +def _parse_cluster_hosts(hosts): + nodes = [] + for host in hosts.split(","): + host, port = host.split(":") + nodes.append(ClusterNode(host, port)) + return nodes + + class RedisStore(TableStore): caps = TableStoreCaps( supports_delete=True, @@ -38,12 +46,20 @@ def __init__( name: str, data_sql_schema: List[Column], cluster_mode: bool = False, + password: Optional[str] = None, ) -> None: self.connection = connection if not cluster_mode: self.redis_connection: Union[Redis, RedisCluster] = Redis.from_url(connection, decode_responses=True) else: - self.redis_connection = RedisCluster.from_url(connection, decode_responses=True) + if "," in connection: + self.redis_connection = RedisCluster( + startup_nodes=_parse_cluster_hosts(connection.removeprefix("redis://")), + password=password, + decode_responses=True + ) + else: + self.redis_connection = RedisCluster.from_url(connection, decode_responses=True) self.name = name self.data_sql_schema = data_sql_schema