Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: Run linters

on:
push:
branches:
- "master"
paths:
- ".github/workflows/test.yaml"
- "datapipe/**"
- "tests/**"
- "pyproject.toml"

pull_request:
branches:
- "**"
paths:
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: Run tests

on:
push:
branches:
- "master"
paths:
- ".github/workflows/test.yaml"
- "datapipe/**"
- "tests/**"
- "pyproject.toml"

pull_request:
branches:
- "**"
paths:
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/test_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: Test examples

on:
push:
branches:
- "master"
paths:
- ".github/workflows/test.yaml"
- "datapipe/**"
- "tests/**"
- "pyproject.toml"

pull_request:
branches:
- "**"
paths:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# WIP

* Fix for getting existing idx of empy meta-table
* Add `keep_existing=True` to `TransformMetaTable` to prevent table metadata conflicts
* Enhance `RedisStore` with multi-node cluster support and password authentication

# 0.14.4

Expand Down
2 changes: 2 additions & 0 deletions datapipe/meta/sql_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ def __init__(
name,
dbconn.sqla_metadata,
*self.sql_schema,
# TODO remove in 0.15 release
keep_existing=True,
)

if create_table:
Expand Down
20 changes: 18 additions & 2 deletions datapipe/store/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pandas as pd
from redis.client import Redis
from redis.cluster import RedisCluster
from redis.cluster import RedisCluster, ClusterNode
from sqlalchemy import Column

from datapipe.store.database import MetaKey
Expand All @@ -23,6 +23,14 @@ def _to_itertuples(df: DataDF, colnames):
return list(df[colnames].itertuples(index=False, name=None))


def _parse_cluster_hosts(hosts):
nodes = []
for host in hosts.split(","):
host, port = host.split(":")
nodes.append(ClusterNode(host, port))
return nodes


class RedisStore(TableStore):
caps = TableStoreCaps(
supports_delete=True,
Expand All @@ -38,12 +46,20 @@ def __init__(
name: str,
data_sql_schema: List[Column],
cluster_mode: bool = False,
password: Optional[str] = None,
) -> None:
self.connection = connection
if not cluster_mode:
self.redis_connection: Union[Redis, RedisCluster] = Redis.from_url(connection, decode_responses=True)
else:
self.redis_connection = RedisCluster.from_url(connection, decode_responses=True)
if "," in connection:
self.redis_connection = RedisCluster(
startup_nodes=_parse_cluster_hosts(connection.removeprefix("redis://")),
password=password,
decode_responses=True
)
else:
self.redis_connection = RedisCluster.from_url(connection, decode_responses=True)

self.name = name
self.data_sql_schema = data_sql_schema
Expand Down