Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
639b754
Added separate force vectors to ram switch to qdrant store.
Great-Frosty Mar 16, 2025
6f13a4f
Changelog updated for qdrant store.
Great-Frosty Aug 11, 2025
281bf5b
Phase 1: Add TransformInputOffsetTable infrastructure
halconel Oct 8, 2025
d23151e
Phase 2: Implement offset-based query optimization with runtime switc…
halconel Oct 8, 2025
926c00a
Phase 3: Add automatic offset updates after successful processing
halconel Oct 8, 2025
1ddef28
Add safe error handling for missing offset table
halconel Oct 8, 2025
5c8968e
Phase 4: Add offset initialization and fix delete tracking
halconel Oct 9, 2025
753df1d
Phase 5: Add performance tests with scalability analysis
halconel Oct 10, 2025
f6e5cc0
Fix CI errors: type annotations, SQL compatibility, and test assertions
halconel Oct 10, 2025
1e711ca
Refactor init_offsets docstring
halconel Oct 10, 2025
565f10d
Refactor optimization flag checking into helper methods
halconel Oct 10, 2025
979220b
Fix race condition in offset updates with atomic max operation
halconel Oct 10, 2025
120606e
[Looky-7769] fix: Add keep_existing=True to TransformMetaTable and en…
halconel Oct 13, 2025
6a7d084
[Looky-7769] docs: Update CHANGELOG with fixes from looky-crunch branch
halconel Oct 13, 2025
8a0cb2a
[Looky-7769] fix: Fix mypy type error for password parameter in Redis…
halconel Oct 13, 2025
e29fc35
Add TODO comment for future code removal
elephantum Oct 14, 2025
4fcb686
fix actions rules for from-the-fork PRs
elephantum Oct 14, 2025
e103943
Merge pull request #357 from halconel/Looky-7769/looky-crunch-fixes
elephantum Oct 14, 2025
e31b544
v0.14.5
elephantum Oct 14, 2025
2d959b2
fix version to v0.14.5
elephantum Oct 14, 2025
5f0bc64
[Looky-7769] fix: Add backward compatibility for DataTable API in Bat…
halconel Oct 15, 2025
029971f
[Looky-7769] fix: Fix linter errors and update CHANGELOG
halconel Oct 15, 2025
aede112
Merge branch 'master' into Looky-7769/fix-datatable-dt-attribute-error
halconel Oct 15, 2025
268136b
[Looky-7769] fix: Replace hasattr with isinstance and update type ann…
halconel Oct 15, 2025
dd59490
[Looky-7769] fix: Remove unnecessary cast and type ignore comment
halconel Oct 15, 2025
9553df6
[Looky-7769] fix: Use Sequence type and normalize input_dts for prope…
halconel Oct 15, 2025
30643d1
fix py3.9 compatibility
elephantum Oct 15, 2025
ec94889
mv changelog entry to new version
elephantum Oct 15, 2025
7292c74
increment version
elephantum Oct 15, 2025
03fa8d8
*
elephantum Oct 15, 2025
fbc726e
Merge pull request #358 from halconel/Looky-7769/fix-datatable-dt-att…
elephantum Oct 15, 2025
46285d3
Merge branch 'master' into feature/qdrant-force-to-ram
elephantum Oct 15, 2025
050f141
v0.14.6a2
elephantum Oct 15, 2025
01238b7
Merge pull request #350 from epoch8/feature/qdrant-force-to-ram
elephantum Oct 15, 2025
fae5b72
Merge remote-tracking branch 'origin/master' into pr/halconel/356
elephantum Oct 15, 2025
69b02ec
[Looky-7769] fix: allow DataTable objects directly in ComputeStep inp…
halconel Oct 23, 2025
58f6a54
[Looky-7769] docs: update CHANGELOG for ComputeStep input_dts fix
halconel Oct 23, 2025
1a847d0
Merge pull request #360 from halconel/Looky-7769/fix-compute-step-inp…
elephantum Oct 23, 2025
d796510
v0.14.6-alpha.3
elephantum Oct 23, 2025
93f2685
[Looky-7769] fix: pandas merge performace by filttered join
halconel Oct 27, 2025
ed77858
[Looky-7769] fix: include join_keys columns in idx for filtered join …
halconel Oct 27, 2025
40f2194
Changed calculation checksum and added storing pandas DataFrame to pa…
b0r3y Oct 28, 2025
5827559
Chore
b0r3y Oct 28, 2025
497adfa
[Looky-7769] feat: add comprehensive tests for multi-table filtered j…
halconel Oct 28, 2025
99353dc
[Looky-7769] fix: join with data-table to reach additional_columns
halconel Oct 28, 2025
740a1d6
lint + test fix
b0r3y Oct 28, 2025
ebf1bff
test fix
b0r3y Oct 28, 2025
95a2341
[Looky-7769] fix: implement reverse join for reference tables in filt…
halconel Oct 28, 2025
a45b918
[Looky-7769] fix: add type annotation for error_select_cols in sql_meta
halconel Oct 28, 2025
a6047fe
Added hash function to PILFile and BytesFile
b0r3y Oct 29, 2025
2eb4e1a
Lint fix
b0r3y Oct 29, 2025
e9e4533
Add examples for parquet
b0r3y Oct 29, 2025
183a66a
[Looky-7769] fix: create offsets for JoinSpec tables with join_keys d…
halconel Oct 29, 2025
f794999
[Looky-7769] feat: add test for three-table filtered join with v1 vs …
halconel Oct 29, 2025
3615768
rm use_adapter_hash parameter
elephantum Nov 4, 2025
8083b6b
*
elephantum Nov 4, 2025
04cbf0b
*
elephantum Nov 4, 2025
47f7ec6
Merge pull request #3 from halconel/Looky-7769/offsets-hybrid
halconel Nov 20, 2025
11efa84
Refactor ItemStoreFileAdapter hash function
b0r3y Nov 25, 2025
93369ed
Refactor ItemStoreFileAdapter hash function
b0r3y Nov 25, 2025
26ea05c
fix lint
b0r3y Nov 25, 2025
d5c4615
*
elephantum Nov 25, 2025
fa295e8
changelog
elephantum Nov 25, 2025
f69a1f0
Merge pull request #361 from epoch8:added-pandas-parquet
elephantum Nov 25, 2025
da879f2
source /home/elephantum/Epoch8/Datapipe/datapipe/.venv/bin/activate
elephantum Nov 25, 2025
8b76fd5
Add use_offset_optimization field to BatchTransform and DatatableBatc…
halconel Dec 2, 2025
f258a5b
[Looky-7769] fix: add comprehensive test suite and documentation for …
halconel Dec 11, 2025
3d91c56
[Looky-7769] fix: implement ORDER BY update_ts to prevent data loss i…
halconel Dec 11, 2025
7ccbc38
[Looky-7769] fix: change strict inequality to >= and add process_ts f…
halconel Dec 12, 2025
93eb568
[Looky-7769] fix: add warning when store_chunk is called with past ti…
halconel Dec 12, 2025
ab5db8f
[Looky-7769] docs: add offset optimization documentation and remove t…
halconel Dec 12, 2025
6fb128d
[Looky-7769] fix: correct test expectations in offset edge cases tests
halconel Dec 12, 2025
a83cf78
[Looky-7769] fix: literals are restricted in GROUP BY clause
halconel Dec 12, 2025
5a0eeae
[Looky-7769] fix: ensure atomic offset commit at end of run_full by p…
halconel Dec 17, 2025
d93d3bc
[Looky-7769] fix: rename variables in ChangeList.extend to resolve my…
halconel Dec 17, 2025
c297a00
[Looky-7769] fix: increase timing delays in flaky tests and skip conc…
halconel Dec 17, 2025
21a21e9
[Looky-7769] fix: skip custom ordering test for SQLite due to NULLS L…
halconel Dec 17, 2025
c050248
doc: add comprehensive offset optimization documentation
halconel Dec 25, 2025
e299b4b
doc: split offset optimization documentation into separate feature files
halconel Dec 25, 2025
f1d7e57
feat: add unique CTE naming for reusing same table with different joi…
halconel Dec 27, 2025
f89a094
feat: fix deleted records not being removed from output when using jo…
halconel Dec 27, 2025
69d422f
docs: remove offset optimization explanation files that should only e…
halconel Dec 27, 2025
3e9ece4
refactor: simplify verbose comments in offset optimization code
halconel Dec 27, 2025
c5bbc93
feat: add deduplication and cross join support for offset optimizatio…
halconel Dec 27, 2025
63e0cbd
refactor: reorganize offset optimization functions with symmetric nam…
halconel Dec 28, 2025
8743a43
fix: apply epsilon-adjusted offset in WHERE clause to prevent data lo…
halconel Dec 30, 2025
a4a9877
perf: replace OR with UNION in offset WHERE clauses to enable index u…
halconel Dec 30, 2025
598841b
fix: add table qualification to filters_idx WHERE clause to resolve a…
halconel Jan 27, 2026
58877fa
test: add tests for reverse join with filters_idx to verify ambiguous…
halconel Jan 27, 2026
2d16d61
fix: add type annotation for select_cols to satisfy mypy
halconel Jan 27, 2026
be21ff5
fix: pass tbl=None for JOIN queries in filters_idx to avoid incorrect…
halconel Jan 27, 2026
b3e7cc5
fix: filter out NaN values from join_keys before SQL IN clause to pre…
halconel Jan 27, 2026
554f7bd
fix: add label() to all qualified column names in JOIN SELECT to prev…
halconel Jan 28, 2026
5b1a19a
fix: disable offset optimization for changelist processing to prevent…
halconel Jan 28, 2026
43eaf2c
test: add production case for advertising campaign moderation with co…
halconel Jan 28, 2026
0e5b668
fix: use offset=0 for changelist to support reverse join and fix ambi…
halconel Jan 28, 2026
cb495f2
Revert "fix: disable offset optimization for changelist processing to…
halconel Jan 28, 2026
a1f1963
fix: use input idx for offset calculation when aggregation removes tr…
halconel Jan 29, 2026
cf8b966
perf: replace UNION with UNION ALL and exclude duplicates in WHERE to…
halconel Jan 30, 2026
4a0d33a
fix: filter NaN values in sql_apply_idx_filter_to_table to prevent VA…
halconel Jan 30, 2026
61be4a9
feat: add offset diagnostic logging and max_records_per_run limit to …
halconel Feb 4, 2026
5563813
fix: add missing max_records_per_run parameter to BatchTransformStep.…
halconel Feb 6, 2026
53e3ea3
fix: preserve composite join key pairs in filtered join
halconel Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: Run linters

on:
push:
branches:
- "master"
paths:
- ".github/workflows/test.yaml"
- "datapipe/**"
- "tests/**"
- "pyproject.toml"

pull_request:
branches:
- "**"
paths:
Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: Run tests

on:
push:
branches:
- "master"
paths:
- ".github/workflows/test.yaml"
- "datapipe/**"
- "tests/**"
- "pyproject.toml"

pull_request:
branches:
- "**"
paths:
Expand Down Expand Up @@ -111,7 +120,7 @@ jobs:

- name: Install dependencies
run: |
pip install ${{ matrix.pip-extra }} ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp,elastic]" "pytest<8" "pytest_cases" "elasticsearch<9"
pip install ${{ matrix.pip-extra }} ".[sqlite,excel,milvus,gcsfs,s3fs,redis,qdrant,gcp,elastic,pyarrow]" "pytest<8" "pytest_cases" "elasticsearch<9"

- name: Test with pytest
run: |
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/test_examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ name: Test examples

on:
push:
branches:
- "master"
paths:
- ".github/workflows/test.yaml"
- "datapipe/**"
- "tests/**"
- "pyproject.toml"

pull_request:
branches:
- "**"
paths:
Expand Down
19 changes: 18 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
# WIP
# WIP: 0.14.6

* Allow `DataTable` objects directly in `ComputeStep.input_dts` parameter
(auto-wraps in `ComputeInput`)
* Add backward compatibility for DataTable API in BatchTransformStep
* Add optional `force_vectors_to_ram` parameter to `QdrantStore`
* Moved calculating checksum from MetaTable to TableStore
* Added PandasParquetFile adapter for TableStoreFiledir to store pandas
DataFrames in parquet
* Added hash_row for BytesFile and PILFile
* Added use_adapter_hash parameter in TableStoreFiledir for backward
compatibility

# 0.14.5

* Fix for getting existing idx of empy meta-table
* Add `keep_existing=True` to `TransformMetaTable` to prevent table metadata
conflicts
* Enhance `RedisStore` with multi-node cluster support and password
authentication

# 0.14.4

Expand Down
67 changes: 67 additions & 0 deletions datapipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,73 @@ def migrate_transform_tables(ctx: click.Context, labels: str, name: str) -> None
return migrations_v013.migrate_transform_tables(app, batch_transforms_steps)


@cli.command()
@click.option("--step", type=click.STRING, help="Step name to initialize offsets for (optional)")
@click.pass_context
def init_offsets(ctx, step: Optional[str]):
"""
Инициализировать таблицу offset'ов из существующих данных TransformMetaTable.

Команда сканирует уже обработанные данные и устанавливает начальные значения offset'ов,
чтобы обеспечить плавную миграцию на оптимизацию через offset'ы (метод v2).

Если указан --step, инициализирует только этот шаг. Иначе инициализирует
все экземпляры BatchTransformStep в пайплайне.
"""
from datapipe.meta.sql_meta import initialize_offsets_from_transform_meta

app: DatapipeApp = ctx.obj["app"]

# Collect all BatchTransformStep instances
transform_steps = []
for compute_step in app.steps:
if isinstance(compute_step, BaseBatchTransformStep):
if step is None or compute_step.get_name() == step:
transform_steps.append(compute_step)

if not transform_steps:
if step:
rprint(f"[red]Step '{step}' not found or is not a BatchTransformStep[/red]")
else:
rprint("[yellow]No BatchTransformStep instances found in pipeline[/yellow]")
return

rprint(f"[cyan]Found {len(transform_steps)} transform step(s) to initialize[/cyan]")

# Initialize offsets for each step
results = {}
for transform_step in transform_steps:
step_name = transform_step.get_name()
rprint(f"\n[cyan]Initializing offsets for: {step_name}[/cyan]")

try:
offsets = initialize_offsets_from_transform_meta(app.ds, transform_step)

if offsets:
rprint(f"[green]✓ Initialized {len(offsets)} offset(s):[/green]")
for input_name, offset_value in offsets.items():
rprint(f" - {input_name}: {offset_value}")
results[step_name] = offsets
else:
rprint("[yellow]No offsets initialized (no processed data found)[/yellow]")
results[step_name] = {}

except Exception as e:
rprint(f"[red]✗ Failed to initialize: {e}[/red]")
results[step_name] = {}

# Summary
rprint("\n[cyan]═══ Summary ═══[/cyan]")
success_count = sum(1 for v in results.values() if v is not None and len(v) > 0)
empty_count = sum(1 for v in results.values() if v is not None and len(v) == 0)
failed_count = sum(1 for v in results.values() if v is None)

rprint(f"[green]Successful: {success_count}[/green]")
rprint(f"[yellow]Empty (no data): {empty_count}[/yellow]")
if failed_count > 0:
rprint(f"[red]Failed: {failed_count}[/red]")


try:
entry_points = metadata.entry_points(group="datapipe.cli") # type: ignore
except TypeError:
Expand Down
13 changes: 10 additions & 3 deletions datapipe/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Iterable, List, Literal, Optional, Sequence, Tuple
from typing import Dict, Iterable, List, Literal, Optional, Sequence, Tuple, Union

from opentelemetry import trace

Expand Down Expand Up @@ -85,6 +85,9 @@ class StepStatus:
class ComputeInput:
dt: DataTable
join_type: Literal["inner", "full"] = "full"
# Filtered join optimization: mapping from idx columns to dt columns
# Example: {"user_id": "id"} means filter dt by dt.id IN (idx.user_id)
join_keys: Optional[Dict[str, str]] = None


class ComputeStep:
Expand All @@ -106,13 +109,17 @@ class ComputeStep:
def __init__(
self,
name: str,
input_dts: List[ComputeInput],
input_dts: Sequence[Union[ComputeInput, DataTable]],
output_dts: List[DataTable],
labels: Optional[Labels] = None,
executor_config: Optional[ExecutorConfig] = None,
) -> None:
self._name = name
self.input_dts = input_dts
# Нормализация input_dts: автоматически оборачиваем DataTable в ComputeInput
self.input_dts = [
inp if isinstance(inp, ComputeInput) else ComputeInput(dt=inp, join_type="full")
for inp in input_dts
]
self.output_dts = output_dts
self._labels = labels
self.executor_config = executor_config
Expand Down
18 changes: 13 additions & 5 deletions datapipe/datatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from opentelemetry import trace

from datapipe.event_logger import EventLogger
from datapipe.meta.sql_meta import MetaTable
from datapipe.meta.sql_meta import MetaTable, TransformInputOffsetTable
from datapipe.run_config import RunConfig
from datapipe.store.database import DBConn
from datapipe.store.table_store import TableStore
from datapipe.types import DataDF, IndexDF, MetadataDF, data_to_index, index_difference
from datapipe.types import DataDF, IndexDF, MetadataDF, data_to_index, index_difference, index_to_data

if TYPE_CHECKING:
try:
Expand Down Expand Up @@ -87,13 +87,18 @@ def store_chunk(
if not data_df.empty:
logger.debug(f"Inserting chunk {len(data_df.index)} rows into {self.name}")

hash_df = self.table_store.hash_rows(data_df)

with tracer.start_as_current_span("get_changes_for_store_chunk"):
(
new_df,
changed_df,
new_index_df,
changed_index_df,
new_meta_df,
changed_meta_df,
) = self.meta_table.get_changes_for_store_chunk(data_df, now)
) = self.meta_table.get_changes_for_store_chunk(hash_df, now)

new_df = index_to_data(data_df, new_index_df)
changed_df = index_to_data(data_df, changed_index_df)

# TODO implement transaction meckanism
with tracer.start_as_current_span("store data"):
Expand Down Expand Up @@ -165,6 +170,9 @@ def __init__(

self.create_meta_table = create_meta_table

# Создать таблицу offset'ов (используем тот же флаг create_meta_table)
self.offset_table = TransformInputOffsetTable(meta_dbconn, create_table=create_meta_table)

def create_table(self, name: str, table_store: TableStore) -> DataTable:
assert name not in self.tables

Expand Down
Loading
Loading