Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 74 additions & 21 deletions datapipe/meta/sql_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,16 @@ def get_changes_for_store_chunk(
changed_meta_df - строки метаданных, которые нужно изменить
"""

current_time = time.time()
if now is None:
now = time.time()
now = current_time
elif now < current_time - 1.0: # Порог 1 секунда - игнорируем микросекундные различия
# Предупреждение: использование timestamp из прошлого может привести к потере данных
# при использовании offset optimization (Hypothesis 4: delayed records)
logger.warning(
f"store_chunk called with now={now:.2f} which is {current_time - now:.2f}s in the past. "
f"This may cause data loss with offset optimization if offset > now."
)

# получить meta по чанку
existing_meta_df = self.get_metadata(hash_to_index(hash_df, self.primary_keys), include_deleted=True)
Expand Down Expand Up @@ -870,6 +878,11 @@ def build_changed_idx_sql_v2(
# Полный список колонок для SELECT (transform_keys + additional_columns)
all_select_keys = list(transform_keys) + additional_columns

# Добавляем update_ts для ORDER BY если его еще нет
# (нужно для правильной сортировки батчей по времени обновления)
if 'update_ts' not in all_select_keys:
all_select_keys.append('update_ts')

# 1. Получить все offset'ы одним запросом для избежания N+1
offsets = offset_table.get_offsets_for_transformation(transformation_id)
# Для таблиц без offset используем 0.0 (обрабатываем все данные)
Expand Down Expand Up @@ -933,15 +946,15 @@ def build_changed_idx_sql_v2(

# SELECT primary_cols FROM reference_meta
# JOIN primary_data ON primary.join_key = reference.id
# WHERE reference.update_ts > offset
# WHERE reference.update_ts >= offset (используем >= вместо >)
changed_sql = sa.select(*select_cols).select_from(
tbl.join(primary_data_tbl, join_condition)
).where(
sa.or_(
tbl.c.update_ts > offset,
tbl.c.update_ts >= offset,
sa.and_(
tbl.c.delete_ts.isnot(None),
tbl.c.delete_ts > offset
tbl.c.delete_ts >= offset
)
)
)
Expand All @@ -961,13 +974,13 @@ def build_changed_idx_sql_v2(
if len(keys_in_data_only) == 0:
select_cols = [sa.column(k) for k in keys_in_meta]

# SELECT keys FROM input_meta WHERE update_ts > offset OR delete_ts > offset
# SELECT keys FROM input_meta WHERE update_ts >= offset OR delete_ts >= offset
changed_sql = sa.select(*select_cols).select_from(tbl).where(
sa.or_(
tbl.c.update_ts > offset,
tbl.c.update_ts >= offset,
sa.and_(
tbl.c.delete_ts.isnot(None),
tbl.c.delete_ts > offset
tbl.c.delete_ts >= offset
)
)
)
Expand All @@ -986,10 +999,10 @@ def build_changed_idx_sql_v2(
select_cols = [sa.column(k) for k in keys_in_meta]
changed_sql = sa.select(*select_cols).select_from(tbl).where(
sa.or_(
tbl.c.update_ts > offset,
tbl.c.update_ts >= offset,
sa.and_(
tbl.c.delete_ts.isnot(None),
tbl.c.delete_ts > offset
tbl.c.delete_ts >= offset
)
)
)
Expand All @@ -1010,17 +1023,18 @@ def build_changed_idx_sql_v2(
select_cols = [sa.column(k) for k in keys_in_meta]
changed_sql = sa.select(*select_cols).select_from(tbl).where(
sa.or_(
tbl.c.update_ts > offset,
tbl.c.update_ts >= offset,
sa.and_(
tbl.c.delete_ts.isnot(None),
tbl.c.delete_ts > offset
tbl.c.delete_ts >= offset
)
)
)
changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, keys_in_meta, filters_idx)
changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config)
if len(select_cols) > 0:
changed_sql = changed_sql.group_by(*select_cols)

changed_ctes.append(changed_sql.cte(name=f"{inp.dt.name}_changes"))
continue

Expand All @@ -1040,10 +1054,10 @@ def build_changed_idx_sql_v2(
tbl.join(data_tbl, join_condition)
).where(
sa.or_(
tbl.c.update_ts > offset,
tbl.c.update_ts >= offset,
sa.and_(
tbl.c.delete_ts.isnot(None),
tbl.c.delete_ts > offset
tbl.c.delete_ts >= offset
)
)
)
Expand All @@ -1062,9 +1076,20 @@ def build_changed_idx_sql_v2(
# Важно: error_records должен иметь все колонки из all_select_keys для UNION
# Для additional_columns используем NULL, так как их нет в transform meta table
tr_tbl = meta_table.sql_table
error_select_cols: List[Any] = [sa.column(k) for k in transform_keys] + [
sa.literal(None).label(k) for k in additional_columns
]
# Для error_records нужно создать колонки из all_select_keys
# Колонки из transform_keys берем из tr_tbl, остальные - NULL
error_select_cols: List[Any] = []
for k in all_select_keys:
if k in transform_keys:
error_select_cols.append(sa.column(k))
else:
# Для дополнительных колонок (включая update_ts) используем NULL с правильным типом
# update_ts это Float, остальные - String
if k == 'update_ts':
error_select_cols.append(sa.cast(sa.literal(None), sa.Float).label(k))
else:
error_select_cols.append(sa.literal(None).label(k))

error_records_sql: Any = sa.select(*error_select_cols).select_from(tr_tbl).where(
sa.or_(
tr_tbl.c.is_success != True, # noqa
Expand All @@ -1084,10 +1109,11 @@ def build_changed_idx_sql_v2(
# 4. Объединить все изменения и ошибки через UNION
if len(changed_ctes) == 0:
# Если нет входных таблиц с изменениями, используем только ошибки
union_sql: Any = sa.select(*[error_records_cte.c[k] for k in all_select_keys]).select_from(error_records_cte)
union_sql: Any = sa.select(
*[error_records_cte.c[k] for k in all_select_keys]
).select_from(error_records_cte)
else:
# UNION всех изменений и ошибок
# Важно: UNION должен включать все колонки из all_select_keys
# Для отсутствующих колонок используем NULL
union_parts = []
for cte in changed_ctes:
Expand All @@ -1099,7 +1125,9 @@ def build_changed_idx_sql_v2(
union_parts.append(sa.select(*select_cols).select_from(cte))

union_parts.append(
sa.select(*[error_records_cte.c[k] for k in all_select_keys]).select_from(error_records_cte)
sa.select(
*[error_records_cte.c[k] for k in all_select_keys]
).select_from(error_records_cte)
)

union_sql = sa.union(*union_parts)
Expand All @@ -1117,19 +1145,44 @@ def build_changed_idx_sql_v2(

# Используем `out` для консистентности с v1
# Важно: Включаем все колонки (transform_keys + additional_columns)

# Error records имеют update_ts = NULL, используем это для их идентификации
is_error_record = union_cte.c.update_ts.is_(None)

out = (
sa.select(
sa.literal(1).label("_datapipe_dummy"),
*[union_cte.c[k] for k in all_select_keys if k in union_cte.c]
*[union_cte.c[k] for k in all_select_keys if k in union_cte.c],
)
.select_from(union_cte)
.outerjoin(tr_tbl, onclause=join_onclause_sql)
.where(
# Фильтрация для предотвращения зацикливания при >= offset
# Логика аналогична v1, но с учетом error_records
sa.or_(
# Error records (update_ts IS NULL) - всегда обрабатываем
is_error_record,
# Не обработано (первый раз)
tr_tbl.c.process_ts.is_(None),
# Успешно обработано, но данные обновились после обработки
sa.and_(
tr_tbl.c.is_success == True, # noqa
union_cte.c.update_ts > tr_tbl.c.process_ts
)
# Примечание: is_success != True НЕ проверяем, так как
# ошибочные записи уже включены в error_records CTE
)
)
)

if order_by is None:
# Сортировка: сначала по update_ts (для консистентности с offset),
# затем по transform_keys (для детерминизма)
# NULLS LAST - error_records (с update_ts = NULL) обрабатываются последними
out = out.order_by(
tr_tbl.c.priority.desc().nullslast(),
*[union_cte.c[k] for k in transform_keys],
union_cte.c.update_ts.asc().nullslast(), # Сортировка по времени обновления, NULL в конце
*[union_cte.c[k] for k in transform_keys], # Детерминизм при одинаковых update_ts
)
else:
if order == "desc":
Expand Down
1 change: 1 addition & 0 deletions docs/source/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [Table](./concepts-table.md)
- [Transform](./concepts-transform.md)
- [How merging works](./how-merging-works.md)
- [Offset Optimization](./offset-optimization.md)

# Command Line Interface

Expand Down
167 changes: 167 additions & 0 deletions docs/source/offset-optimization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Offset Optimization

Offset optimization is a feature that improves performance of incremental processing by tracking the last processed timestamp (offset) for each input table in a transformation. This allows Datapipe to skip already-processed records without scanning the entire transformation metadata table.

## How It Works

### Without Offset Optimization (v1)

The traditional approach (v1) uses a FULL OUTER JOIN between input tables and transformation metadata:

```sql
SELECT transform_keys
FROM input_table
FULL OUTER JOIN transform_meta ON transform_keys
WHERE input.update_ts > transform_meta.process_ts
OR transform_meta.is_success != True
```

This approach:
- ✅ Always correct - finds all records that need processing
- ❌ Scans entire transformation metadata table on every run
- ❌ Performance degrades as metadata grows

### With Offset Optimization (v2)

The optimized approach (v2) uses per-input-table offsets to filter data early:

```sql
-- For each input table, filter by offset first
WITH input_changes AS (
SELECT transform_keys, update_ts
FROM input_table
WHERE update_ts >= :offset -- Early filtering by offset
),
error_records AS (
SELECT transform_keys
FROM transform_meta
WHERE is_success != True
)
-- Union all changes
SELECT transform_keys, update_ts
FROM input_changes
UNION ALL
SELECT transform_keys, NULL as update_ts
FROM error_records
-- Then check process_ts to avoid reprocessing
LEFT JOIN transform_meta ON transform_keys
WHERE update_ts IS NULL -- Error records
OR process_ts IS NULL -- Never processed
OR (is_success = True AND update_ts > process_ts) -- Updated after processing
ORDER BY update_ts, transform_keys
```

This approach:
- ✅ Filters most records early using index on `update_ts`
- ✅ Only scans records with `update_ts >= offset`
- ✅ Performance stays constant regardless of metadata size
- ⚠️ Requires careful implementation to avoid data loss

## Key Implementation Details

### 1. Inclusive Inequality (`>=` not `>`)

The offset filter must use `>=` instead of `>`:

```python
# Correct
WHERE update_ts >= offset

# Wrong - loses records with update_ts == offset
WHERE update_ts > offset
```

### 2. Process Timestamp Check

After filtering by offset, we must check `process_ts` to avoid reprocessing:

```python
WHERE (
update_ts IS NULL # Error records (always process)
OR process_ts IS NULL # Never processed
OR (is_success = True AND update_ts > process_ts) # Updated after last processing
)
```

This prevents infinite loops when using `>=` offset.

### 3. Ordering

Results are ordered by `update_ts` first, then `transform_keys` for determinism:

```sql
ORDER BY update_ts, transform_keys
```

This ensures that:
- Records are processed in chronological order
- The offset accurately represents the last processed timestamp
- No records with earlier timestamps are skipped

### 4. Error Records

Records that failed processing (`is_success != True`) are always included via a separate CTE, regardless of offset:

```sql
error_records AS (
SELECT transform_keys, NULL as update_ts
FROM transform_meta
WHERE is_success != True
)
```

Error records have `update_ts = NULL` to distinguish them from changed records.

## Enabling Offset Optimization

Offset optimization is controlled by the `use_offset_optimization` field in transform configuration:

```python
BatchTransform(
func=my_transform,
inputs=[input_table],
outputs=[output_table],
# Add this field to enable offset optimization
use_offset_optimization=True,
)
```

When enabled, Datapipe tracks offsets in the `offset_table` and uses them to optimize changelist queries.

## Important Considerations

### Timestamp Accuracy

The offset optimization relies on accurate timestamps. If you manually call `store_chunk()` with a `now` parameter that is in the past:

```python
# Warning: This may cause data loss with offset optimization!
dt.store_chunk(data, now=old_timestamp)
```

Datapipe will log a warning:

```
WARNING - store_chunk called with now=X which is Ys in the past.
This may cause data loss with offset optimization if offset > now.
```

In normal operation, `store_chunk()` uses the current time automatically, so this is not a concern unless you explicitly provide the `now` parameter.

### When to Use

Offset optimization is most beneficial when:
- ✅ Transformations have large metadata tables (many processed records)
- ✅ Incremental updates are small compared to total data
- ✅ Input tables have an index on `update_ts`

It may not help when:
- ❌ Processing all data on every run (full refresh)
- ❌ Metadata table is small (< 10k records)
- ❌ Most records are updated on every run

## See Also

- [How Merging Works](./how-merging-works.md) - Understanding the changelist query strategy
- [BatchTransform](./reference-batchtransform.md) - Transform configuration reference
- [Lifecycle of a ComputeStep](./transformation-lifecycle.md) - Transformation execution flow
6 changes: 3 additions & 3 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ services:
environment:
discovery.type: single-node
ES_JAVA_OPTS: -Xms4g -Xmx4g
xpack.security.enabled: false
xpack.security.http.ssl.enabled: false
xpack.security.enabled: "false"
xpack.security.http.ssl.enabled: "false"
node.name: node
cluster.name: cluster
http.cors.enabled: true
http.cors.enabled: "true"
Empty file.
Loading
Loading