From 93f268523b79f2658e4a66c49a507fab3de31981 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Mon, 27 Oct 2025 20:53:23 +0300 Subject: [PATCH 1/8] [Looky-7769] fix: pandas merge performace by filttered join --- datapipe/compute.py | 3 ++ datapipe/step/batch_transform.py | 53 ++++++++++++++++++++++++++++++-- datapipe/types.py | 4 +++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/datapipe/compute.py b/datapipe/compute.py index 5b634796..02104b00 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -85,6 +85,9 @@ class StepStatus: class ComputeInput: dt: DataTable join_type: Literal["inner", "full"] = "full" + # Filtered join optimization: mapping from idx columns to dt columns + # Example: {"user_id": "id"} means filter dt by dt.id IN (idx.user_id) + join_keys: Optional[Dict[str, str]] = None class ComputeStep: diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 6ec8142d..bdb7546d 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -588,7 +588,55 @@ def get_batch_input_dfs( idx: IndexDF, run_config: Optional[RunConfig] = None, ) -> List[DataDF]: - return [inp.dt.get_data(idx) for inp in self.input_dts] + """ + Получить входные данные для батча с поддержкой filtered join. + + Если у ComputeInput указаны join_keys, читаем только связанные записи + для оптимизации производительности. + """ + result = [] + + for inp in self.input_dts: + if inp.join_keys: + # FILTERED JOIN: Читаем только связанные записи + # Извлекаем уникальные значения foreign keys из idx + filtered_idx_data = {} + all_keys_present = True + + for idx_col, dt_col in inp.join_keys.items(): + if idx_col in idx.columns: + # Получаем уникальные значения и создаем маппинг + unique_values = idx[idx_col].unique() + filtered_idx_data[dt_col] = unique_values + else: + # Если хотя бы одного ключа нет - используем fallback + all_keys_present = False + break + + if all_keys_present and filtered_idx_data: + # Создаем filtered_idx для чтения только нужных записей + filtered_idx = IndexDF(pd.DataFrame(filtered_idx_data)) + + logger.debug( + f"[{self.get_name()}] Filtered join for {inp.dt.name}: " + f"reading {len(filtered_idx)} records instead of full table" + ) + + data = inp.dt.get_data(filtered_idx) + else: + # Fallback: если не все ключи присутствуют, читаем по idx + logger.debug( + f"[{self.get_name()}] Filtered join fallback for {inp.dt.name}: " + f"join_keys={inp.join_keys} not found in idx columns {list(idx.columns)}" + ) + data = inp.dt.get_data(idx) + else: + # Обычное чтение по idx + data = inp.dt.get_data(idx) + + result.append(data) + + return result def process_batch_dfs( self, @@ -817,12 +865,13 @@ def pipeline_input_to_compute_input(self, ds: DataStore, catalog: Catalog, input return ComputeInput( dt=catalog.get_datatable(ds, input.table), join_type="inner", + join_keys=input.join_keys, # Pass join_keys for filtered join ) elif isinstance(input, JoinSpec): - # This should not happen, but just in case return ComputeInput( dt=catalog.get_datatable(ds, input.table), join_type="full", + join_keys=input.join_keys, # Pass join_keys for filtered join ) else: return ComputeInput(dt=catalog.get_datatable(ds, input), join_type="full") diff --git a/datapipe/types.py b/datapipe/types.py index 9fb62685..76baa2f1 100644 --- a/datapipe/types.py +++ b/datapipe/types.py @@ -9,6 +9,7 @@ Dict, List, NewType, + Optional, Set, Tuple, Type, @@ -60,6 +61,9 @@ @dataclass class JoinSpec: table: TableOrName + # Filtered join optimization: mapping from idx columns to table columns + # Example: {"user_id": "id"} means filter table by table.id IN (idx.user_id) + join_keys: Optional[Dict[str, str]] = None @dataclass From ed7785852e6b074308b8d6067ff5cfda4546d0e0 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Mon, 27 Oct 2025 21:41:28 +0300 Subject: [PATCH 2/8] [Looky-7769] fix: include join_keys columns in idx for filtered join optimization --- datapipe/meta/sql_meta.py | 54 ++++++++++++++++++++++++-------- datapipe/step/batch_transform.py | 32 ++++++++++++++++++- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 11933e33..4e87e23f 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -730,15 +730,27 @@ def build_changed_idx_sql_v1( order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", run_config: Optional[RunConfig] = None, # TODO remove + additional_columns: Optional[List[str]] = None, ) -> Tuple[Iterable[str], Any]: + """ + Args: + additional_columns: Дополнительные колонки для включения в результат (для filtered join) + """ + if additional_columns is None: + additional_columns = [] + + # Полный список колонок для SELECT (transform_keys + additional_columns) + all_select_keys = list(transform_keys) + additional_columns + all_input_keys_counts: Dict[str, int] = {} for col in itertools.chain(*[inp.dt.primary_schema for inp in input_dts]): all_input_keys_counts[col.name] = all_input_keys_counts.get(col.name, 0) + 1 inp_ctes = [] for inp in input_dts: + # Используем all_select_keys для включения дополнительных колонок keys, cte = inp.dt.meta_table.get_agg_cte( - transform_keys=transform_keys, + transform_keys=all_select_keys, filters_idx=filters_idx, run_config=run_config, ) @@ -746,7 +758,7 @@ def build_changed_idx_sql_v1( agg_of_aggs = _make_agg_of_agg( ds=ds, - transform_keys=transform_keys, + transform_keys=all_select_keys, ctes=inp_ctes, agg_col="update_ts", ) @@ -771,12 +783,14 @@ def build_changed_idx_sql_v1( else: # len(transform_keys) > 1: join_onclause_sql = sa.and_(*[agg_of_aggs.c[key] == out.c[key] for key in transform_keys]) + # Важно: Включаем все колонки (transform_keys + additional_columns) sql = ( sa.select( # Нам нужно выбирать хотя бы что-то, чтобы не было ошибки при # пустом transform_keys sa.literal(1).label("_datapipe_dummy"), - *[sa.func.coalesce(agg_of_aggs.c[key], out.c[key]).label(key) for key in transform_keys], + *[sa.func.coalesce(agg_of_aggs.c[key], out.c[key]).label(key) if key in transform_keys + else agg_of_aggs.c[key].label(key) for key in all_select_keys if key in agg_of_aggs.c], ) .select_from(agg_of_aggs) .outerjoin( @@ -811,7 +825,7 @@ def build_changed_idx_sql_v1( *[sa.asc(sa.column(k)) for k in order_by], out.c.priority.desc().nullslast(), ) - return (transform_keys, sql) + return (all_select_keys, sql) # Обратная совместимость: алиас для старой версии @@ -851,13 +865,22 @@ def build_changed_idx_sql_v2( order_by: Optional[List[str]] = None, order: Literal["asc", "desc"] = "asc", run_config: Optional[RunConfig] = None, + additional_columns: Optional[List[str]] = None, ) -> Tuple[Iterable[str], Any]: """ Новая версия build_changed_idx_sql, использующая offset'ы для оптимизации. Вместо FULL OUTER JOIN всех входных таблиц, выбираем только записи с update_ts > offset для каждой входной таблицы, затем объединяем через UNION. + + Args: + additional_columns: Дополнительные колонки для включения в результат (для filtered join) """ + if additional_columns is None: + additional_columns = [] + + # Полный список колонок для SELECT (transform_keys + additional_columns) + all_select_keys = list(transform_keys) + additional_columns # 1. Получить все offset'ы одним запросом для избежания N+1 offsets = offset_table.get_offsets_for_transformation(transformation_id) @@ -870,17 +893,18 @@ def build_changed_idx_sql_v2( changed_ctes = [] for inp in input_dts: tbl = inp.dt.meta_table.sql_table - keys = [k for k in transform_keys if k in inp.dt.primary_keys] + # Выбираем все ключи, которые есть в этой таблице + keys = [k for k in all_select_keys if k in inp.dt.primary_keys] if len(keys) == 0: continue - transform_key_cols: List[Any] = [sa.column(k) for k in keys] + select_cols: List[Any] = [sa.column(k) for k in keys] offset = offsets[inp.dt.name] - # SELECT transform_keys FROM input_meta WHERE update_ts > offset OR delete_ts > offset + # SELECT keys FROM input_meta WHERE update_ts > offset OR delete_ts > offset # Включаем как обновленные, так и удаленные записи - changed_sql: Any = sa.select(*transform_key_cols).select_from(tbl).where( + changed_sql: Any = sa.select(*select_cols).select_from(tbl).where( sa.or_( tbl.c.update_ts > offset, sa.and_( @@ -894,12 +918,13 @@ def build_changed_idx_sql_v2( changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, keys, filters_idx) changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config) - if len(transform_key_cols) > 0: - changed_sql = changed_sql.group_by(*transform_key_cols) + if len(select_cols) > 0: + changed_sql = changed_sql.group_by(*select_cols) changed_ctes.append(changed_sql.cte(name=f"{inp.dt.name}_changes")) # 3. Получить записи с ошибками из TransformMetaTable + # Важно: error_records содержат только transform_keys, не additional_columns tr_tbl = meta_table.sql_table error_records_sql: Any = sa.select( *[sa.column(k) for k in transform_keys] @@ -925,9 +950,11 @@ def build_changed_idx_sql_v2( union_sql: Any = sa.select(*[error_records_cte.c[k] for k in transform_keys]).select_from(error_records_cte) else: # UNION всех изменений и ошибок + # Важно: UNION должен включать все колонки из all_select_keys union_parts = [] for cte in changed_ctes: - union_parts.append(sa.select(*[cte.c[k] for k in transform_keys if k in cte.c]).select_from(cte)) + # Выбираем только те колонки, которые есть в CTE + union_parts.append(sa.select(*[cte.c[k] for k in all_select_keys if k in cte.c]).select_from(cte)) union_parts.append( sa.select(*[error_records_cte.c[k] for k in transform_keys]).select_from(error_records_cte) @@ -947,10 +974,11 @@ def build_changed_idx_sql_v2( join_onclause_sql = sa.and_(*[union_cte.c[key] == tr_tbl.c[key] for key in transform_keys]) # Используем `out` для консистентности с v1 + # Важно: Включаем все колонки (transform_keys + additional_columns) out = ( sa.select( sa.literal(1).label("_datapipe_dummy"), - *[union_cte.c[k] for k in transform_keys] + *[union_cte.c[k] for k in all_select_keys if k in union_cte.c] ) .select_from(union_cte) .outerjoin(tr_tbl, onclause=join_onclause_sql) @@ -973,7 +1001,7 @@ def build_changed_idx_sql_v2( tr_tbl.c.priority.desc().nullslast(), ) - return (transform_keys, out) + return (all_select_keys, out) TRANSFORM_INPUT_OFFSET_SCHEMA: DataSchema = [ diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index bdb7546d..9ad4d768 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -172,6 +172,28 @@ def _get_optimization_method_name(self, run_config: Optional[RunConfig] = None) """ return "v2_offset" if self._get_use_offset_optimization(run_config) else "v1_join" + def _get_additional_idx_columns(self) -> List[str]: + """ + Собрать дополнительные колонки, необходимые для filtered join. + + Возвращает список колонок из join_keys, которые нужно включить в idx + для работы filtered join оптимизации. + + Returns: + Список имен колонок (без дубликатов) + """ + additional_columns = [] + + for inp in self.input_dts: + if inp.join_keys: + # Добавляем колонки из ключей join_keys (левая часть маппинга) + # Например, для {"user_id": "id"} добавляем "user_id" + for idx_col in inp.join_keys.keys(): + if idx_col not in self.transform_keys and idx_col not in additional_columns: + additional_columns.append(idx_col) + + return additional_columns + def _build_changed_idx_sql( self, ds: DataStore, @@ -189,6 +211,9 @@ def _build_changed_idx_sql( use_offset = self._get_use_offset_optimization(run_config) method = self._get_optimization_method_name(run_config) + # Получить дополнительные колонки для filtered join + additional_columns = self._get_additional_idx_columns() + with tracer.start_as_current_span(f"build_changed_idx_sql_{method}"): start_time = time.time() @@ -204,6 +229,7 @@ def _build_changed_idx_sql( order_by=order_by, order=order, run_config=run_config, + additional_columns=additional_columns, # Передаем дополнительные колонки ) else: keys, sql = build_changed_idx_sql_v1( @@ -215,6 +241,7 @@ def _build_changed_idx_sql( order_by=order_by, order=order, run_config=run_config, + additional_columns=additional_columns, # Передаем дополнительные колонки ) query_build_time = time.time() - start_time @@ -352,7 +379,10 @@ def alter_res_df(): with ds.meta_dbconn.con.begin() as con: for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): - df = df[self.transform_keys] + # Используем join_keys (которые включают transform_keys + additional_columns) + # Фильтруем только колонки, которые есть в df + available_keys = [k for k in join_keys if k in df.columns] + df = df[available_keys] for k, v in extra_filters.items(): df[k] = v From 497adfa73fa9a48986e91418042276e70cd08c63 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Tue, 28 Oct 2025 12:42:11 +0300 Subject: [PATCH 3/8] [Looky-7769] feat: add comprehensive tests for multi-table filtered join optimization --- tests/test_multi_table_filtered_join.py | 457 ++++++++++++++++++++++++ 1 file changed, 457 insertions(+) create mode 100644 tests/test_multi_table_filtered_join.py diff --git a/tests/test_multi_table_filtered_join.py b/tests/test_multi_table_filtered_join.py new file mode 100644 index 00000000..8202f898 --- /dev/null +++ b/tests/test_multi_table_filtered_join.py @@ -0,0 +1,457 @@ +""" +Тесты для проверки мульти-табличных трансформаций с filtered join оптимизацией. + +Тесты проверяют: +1. Что filtered join вызывается корректно при наличии join_keys +2. Что join выполняется по правильным ключам +3. Что результаты v1 (FULL OUTER JOIN) и v2 (offset-based) идентичны +""" +import time + +import pandas as pd +from sqlalchemy import Column, Integer, String + +from datapipe.compute import ComputeInput +from datapipe.datatable import DataStore +from datapipe.step.batch_transform import BatchTransformStep +from datapipe.store.database import DBConn, TableStoreDB + + +def test_filtered_join_is_called(dbconn: DBConn): + """ + Тест 1: Проверяет что filtered join вызывается и читает только нужные данные. + + Сценарий: + - Основная таблица users с user_id + - Справочник profiles с большим количеством записей + - join_keys указывает связь user_id -> id + - Проверяем что из profiles читаются только записи для существующих пользователей + """ + ds = DataStore(dbconn, create_meta_table=True) + + # Основная таблица users + users_store = TableStoreDB( + dbconn, + "users", + [ + Column("user_id", String, primary_key=True), + Column("name", String), + ], + create_table=True, + ) + users_dt = ds.create_table("users", users_store) + + # Справочная таблица profiles (много записей) + profiles_store = TableStoreDB( + dbconn, + "profiles", + [ + Column("id", String, primary_key=True), + Column("description", String), + ], + create_table=True, + ) + profiles_dt = ds.create_table("profiles", profiles_store) + + # Выходная таблица + output_store = TableStoreDB( + dbconn, + "user_profiles", + [ + Column("user_id", String, primary_key=True), + Column("name", String), + Column("description", String), + ], + create_table=True, + ) + output_dt = ds.create_table("user_profiles", output_store) + + # Функция трансформации с отслеживанием вызовов + transform_calls = [] + + def transform_func(users_df, profiles_df): + transform_calls.append({ + "users_count": len(users_df), + "profiles_count": len(profiles_df), + "profiles_ids": sorted(profiles_df["id"].tolist()) if not profiles_df.empty else [] + }) + # Merge by user_id and id + merged = users_df.merge( + profiles_df, + left_on="user_id", + right_on="id", + how="left" + ) + return merged[["user_id", "name", "description"]].fillna("") + + # ОТСЛЕЖИВАНИЕ ВЫЗОВОВ get_data для profiles_dt + get_data_calls = [] + original_get_data = profiles_dt.get_data + + def tracked_get_data(idx=None, **kwargs): + if idx is not None: + get_data_calls.append({ + "idx_columns": list(idx.columns), + "idx_values": sorted(idx["id"].tolist()) if "id" in idx.columns else [], + "idx_length": len(idx) + }) + result = original_get_data(idx=idx, **kwargs) + return result + + profiles_dt.get_data = tracked_get_data # type: ignore[method-assign] + + # Создаем step с filtered join + step = BatchTransformStep( + ds=ds, + name="test_filtered_join", + func=transform_func, + input_dts=[ + ComputeInput(dt=users_dt, join_type="full"), + ComputeInput( + dt=profiles_dt, + join_type="full", + join_keys={"user_id": "id"} # user_id из idx -> id из profiles + ), + ], + output_dts=[output_dt], + transform_keys=["user_id"], + chunk_size=10, + ) + + # Добавляем данные: только 3 пользователя + now = time.time() + users_dt.store_chunk( + pd.DataFrame({ + "user_id": ["u1", "u2", "u3"], + "name": ["Alice", "Bob", "Charlie"] + }), + now=now + ) + + # Добавляем много профилей (100 записей), но только 3 связаны с пользователями + profiles_data = [] + for i in range(100): + profiles_data.append({ + "id": f"p{i}", + "description": f"Profile {i}" + }) + # Добавляем профили для наших пользователей + profiles_data.extend([ + {"id": "u1", "description": "Alice's profile"}, + {"id": "u2", "description": "Bob's profile"}, + {"id": "u3", "description": "Charlie's profile"}, + ]) + + profiles_dt.store_chunk(pd.DataFrame(profiles_data), now=now) + + # Запускаем трансформацию + step.run_full(ds) + + # Проверяем что трансформация вызвалась + assert len(transform_calls) > 0 + + # ПРОВЕРКА 1: get_data был вызван с filtered idx + assert len(get_data_calls) > 0, "get_data should be called with filtered idx" + + first_get_data_call = get_data_calls[0] + assert "id" in first_get_data_call["idx_columns"], ( + f"idx should contain 'id' column for filtered join, " + f"but got columns: {first_get_data_call['idx_columns']}" + ) + assert first_get_data_call["idx_length"] == 3, ( + f"Filtered idx should contain 3 records (u1, u2, u3), " + f"but got {first_get_data_call['idx_length']}" + ) + assert first_get_data_call["idx_values"] == ["u1", "u2", "u3"], ( + f"Filtered idx should contain correct user_ids mapped to profile ids, " + f"but got {first_get_data_call['idx_values']}" + ) + + # ПРОВЕРКА 2: filtered join должен читать только 3 профиля, + # а не все 103 записи из таблицы profiles + first_call = transform_calls[0] + assert first_call["users_count"] == 3, "Should process 3 users" + assert first_call["profiles_count"] == 3, ( + f"Filtered join should read only 3 profiles (for u1, u2, u3), " + f"but got {first_call['profiles_count']}" + ) + assert first_call["profiles_ids"] == ["u1", "u2", "u3"], ( + "Should read profiles only for existing users" + ) + + # Проверяем результат + output_data = output_dt.get_data().sort_values("user_id").reset_index(drop=True) + assert len(output_data) == 3 + assert output_data["user_id"].tolist() == ["u1", "u2", "u3"] + assert output_data["description"].tolist() == [ + "Alice's profile", + "Bob's profile", + "Charlie's profile" + ] + + +def test_join_keys_correctness(dbconn: DBConn): + """ + Тест 2: Проверяет что join происходит по правильным ключам. + + Сценарий: + - Основная таблица orders с order_id и customer_id как часть primary key + - Справочник customers с id и другими полями + - join_keys: {"customer_id": "id"} + - Проверяем что данные джойнятся правильно + """ + ds = DataStore(dbconn, create_meta_table=True) + + # Таблица заказов с customer_id в primary key + orders_store = TableStoreDB( + dbconn, + "orders", + [ + Column("order_id", String, primary_key=True), + Column("customer_id", String, primary_key=True), + Column("amount", Integer), + ], + create_table=True, + ) + orders_dt = ds.create_table("orders", orders_store) + + # Таблица покупателей + customers_store = TableStoreDB( + dbconn, + "customers", + [ + Column("id", String, primary_key=True), + Column("name", String), + ], + create_table=True, + ) + customers_dt = ds.create_table("customers", customers_store) + + # Выходная таблица + output_store = TableStoreDB( + dbconn, + "enriched_orders", + [ + Column("order_id", String, primary_key=True), + Column("customer_id", String, primary_key=True), + Column("customer_name", String), + Column("amount", Integer), + ], + create_table=True, + ) + output_dt = ds.create_table("enriched_orders", output_store) + + def transform_func(orders_df, customers_df): + # Join по customer_id = id + merged = orders_df.merge( + customers_df, + left_on="customer_id", + right_on="id", + how="left" + ) + return merged[["order_id", "customer_id", "name", "amount"]].rename(columns={"name": "customer_name"}) + + # Step с join_keys + step = BatchTransformStep( + ds=ds, + name="test_join_keys", + func=transform_func, + input_dts=[ + ComputeInput(dt=orders_dt, join_type="full"), + ComputeInput( + dt=customers_dt, + join_type="full", + join_keys={"customer_id": "id"} + ), + ], + output_dts=[output_dt], + transform_keys=["order_id", "customer_id"], + ) + + # Добавляем данные + now = time.time() + orders_dt.store_chunk( + pd.DataFrame({ + "order_id": ["o1", "o2", "o3"], + "customer_id": ["c1", "c2", "c1"], + "amount": [100, 200, 150] + }), + now=now + ) + + customers_dt.store_chunk( + pd.DataFrame({ + "id": ["c1", "c2", "c3"], # c3 не используется + "name": ["John", "Jane", "Bob"] + }), + now=now + ) + + # Запускаем + step.run_full(ds) + + # Проверяем результат + output_data = output_dt.get_data().sort_values("order_id").reset_index(drop=True) + assert len(output_data) == 3 + + # КЛЮЧЕВАЯ ПРОВЕРКА: join keys работают правильно + expected = pd.DataFrame({ + "order_id": ["o1", "o2", "o3"], + "customer_id": ["c1", "c2", "c1"], + "customer_name": ["John", "Jane", "John"], + "amount": [100, 200, 150] + }) + pd.testing.assert_frame_equal(output_data, expected) + + +def test_v1_vs_v2_results_identical(dbconn: DBConn): + """ + Тест 3: Сравнивает результаты v1 (FULL OUTER JOIN) и v2 (offset-based). + + Сценарий: + - Две входные таблицы с мульти-табличной трансформацией + - Запускаем одну и ту же трансформацию с v1 и v2 + - Проверяем что результаты идентичны + """ + ds = DataStore(dbconn, create_meta_table=True) + + # Входные таблицы + table_a_store = TableStoreDB( + dbconn, + "table_a", + [ + Column("id", String, primary_key=True), + Column("value_a", Integer), + ], + create_table=True, + ) + table_a_dt = ds.create_table("table_a", table_a_store) + + table_b_store = TableStoreDB( + dbconn, + "table_b", + [ + Column("id", String, primary_key=True), + Column("value_b", Integer), + ], + create_table=True, + ) + table_b_dt = ds.create_table("table_b", table_b_store) + + # Две выходные таблицы - одна для v1, другая для v2 + output_v1_store = TableStoreDB( + dbconn, + "output_v1", + [ + Column("id", String, primary_key=True), + Column("sum_value", Integer), + ], + create_table=True, + ) + output_v1_dt = ds.create_table("output_v1", output_v1_store) + + output_v2_store = TableStoreDB( + dbconn, + "output_v2", + [ + Column("id", String, primary_key=True), + Column("sum_value", Integer), + ], + create_table=True, + ) + output_v2_dt = ds.create_table("output_v2", output_v2_store) + + def transform_func(df_a, df_b): + # Объединяем по id + merged = df_a.merge(df_b, on="id", how="outer") + merged["sum_value"] = merged["value_a"].fillna(0) + merged["value_b"].fillna(0) + return merged[["id", "sum_value"]].astype({"sum_value": int}) + + # Step v1 (без offset) + step_v1 = BatchTransformStep( + ds=ds, + name="test_transform_v1", + func=transform_func, + input_dts=[ + ComputeInput(dt=table_a_dt, join_type="full"), + ComputeInput(dt=table_b_dt, join_type="full"), + ], + output_dts=[output_v1_dt], + transform_keys=["id"], + use_offset_optimization=False, + ) + + # Step v2 (с offset) + step_v2 = BatchTransformStep( + ds=ds, + name="test_transform_v2", + func=transform_func, + input_dts=[ + ComputeInput(dt=table_a_dt, join_type="full"), + ComputeInput(dt=table_b_dt, join_type="full"), + ], + output_dts=[output_v2_dt], + transform_keys=["id"], + use_offset_optimization=True, + ) + + # Добавляем данные + now = time.time() + table_a_dt.store_chunk( + pd.DataFrame({ + "id": ["1", "2", "3"], + "value_a": [10, 20, 30] + }), + now=now + ) + table_b_dt.store_chunk( + pd.DataFrame({ + "id": ["2", "3", "4"], + "value_b": [100, 200, 300] + }), + now=now + ) + + # Запускаем обе трансформации + step_v1.run_full(ds) + step_v2.run_full(ds) + + # Сравниваем результаты + result_v1 = output_v1_dt.get_data().sort_values("id").reset_index(drop=True) + result_v2 = output_v2_dt.get_data().sort_values("id").reset_index(drop=True) + + # КЛЮЧЕВАЯ ПРОВЕРКА: результаты v1 и v2 должны быть идентичны + pd.testing.assert_frame_equal(result_v1, result_v2) + + # Проверяем корректность результатов + expected = pd.DataFrame({ + "id": ["1", "2", "3", "4"], + "sum_value": [10, 120, 230, 300] + }) + pd.testing.assert_frame_equal(result_v1, expected) + + # === Инкрементальная обработка === + # Добавляем новые данные + time.sleep(0.01) + now2 = time.time() + table_a_dt.store_chunk( + pd.DataFrame({ + "id": ["5"], + "value_a": [50] + }), + now=now2 + ) + + # Запускаем снова + step_v1.run_full(ds) + step_v2.run_full(ds) + + # Снова сравниваем + result_v1 = output_v1_dt.get_data().sort_values("id").reset_index(drop=True) + result_v2 = output_v2_dt.get_data().sort_values("id").reset_index(drop=True) + + pd.testing.assert_frame_equal(result_v1, result_v2) + + # Проверяем что добавилась новая запись + assert len(result_v1) == 5 + assert "5" in result_v1["id"].values From 99353dcb7d0e5d124aed36fdecfd192670cac8eb Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Tue, 28 Oct 2025 13:34:51 +0300 Subject: [PATCH 4/8] [Looky-7769] fix: join with data-table to reach additional_columns --- datapipe/meta/sql_meta.py | 141 +++++++++++++++++++++++++++++++------- 1 file changed, 115 insertions(+), 26 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 4e87e23f..933371e3 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -893,42 +893,126 @@ def build_changed_idx_sql_v2( changed_ctes = [] for inp in input_dts: tbl = inp.dt.meta_table.sql_table - # Выбираем все ключи, которые есть в этой таблице - keys = [k for k in all_select_keys if k in inp.dt.primary_keys] - if len(keys) == 0: + # Разделяем ключи на те, что есть в meta table, и те, что нужны из data table + meta_cols = [c.name for c in tbl.columns] + keys_in_meta = [k for k in all_select_keys if k in meta_cols] + keys_in_data_only = [k for k in all_select_keys if k not in meta_cols] + + if len(keys_in_meta) == 0: continue - select_cols: List[Any] = [sa.column(k) for k in keys] offset = offsets[inp.dt.name] - # SELECT keys FROM input_meta WHERE update_ts > offset OR delete_ts > offset - # Включаем как обновленные, так и удаленные записи - changed_sql: Any = sa.select(*select_cols).select_from(tbl).where( - sa.or_( - tbl.c.update_ts > offset, - sa.and_( - tbl.c.delete_ts.isnot(None), - tbl.c.delete_ts > offset + # Если все ключи есть в meta table - используем простой запрос + if len(keys_in_data_only) == 0: + select_cols: List[Any] = [sa.column(k) for k in keys_in_meta] + + # SELECT keys FROM input_meta WHERE update_ts > offset OR delete_ts > offset + changed_sql: Any = sa.select(*select_cols).select_from(tbl).where( + sa.or_( + tbl.c.update_ts > offset, + sa.and_( + tbl.c.delete_ts.isnot(None), + tbl.c.delete_ts > offset + ) ) ) - ) - # Применить filters_idx и run_config - changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, keys, filters_idx) - changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config) + # Применить filters_idx и run_config + changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, keys_in_meta, filters_idx) + changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config) - if len(select_cols) > 0: - changed_sql = changed_sql.group_by(*select_cols) + if len(select_cols) > 0: + changed_sql = changed_sql.group_by(*select_cols) + else: + # Есть колонки только в data table - нужен JOIN с data table + # Проверяем что у table_store есть data_table (для TableStoreDB) + if not hasattr(inp.dt.table_store, 'data_table'): + # Fallback: если нет data_table, используем только meta keys + select_cols = [sa.column(k) for k in keys_in_meta] + changed_sql = sa.select(*select_cols).select_from(tbl).where( + sa.or_( + tbl.c.update_ts > offset, + sa.and_( + tbl.c.delete_ts.isnot(None), + tbl.c.delete_ts > offset + ) + ) + ) + changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, keys_in_meta, filters_idx) + changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config) + if len(select_cols) > 0: + changed_sql = changed_sql.group_by(*select_cols) + else: + # JOIN meta table с data table для получения дополнительных колонок + data_tbl = inp.dt.table_store.data_table + + # Проверяем какие дополнительные колонки действительно есть в data table + data_cols_available = [c.name for c in data_tbl.columns] + keys_in_data_available = [k for k in keys_in_data_only if k in data_cols_available] + + if len(keys_in_data_available) == 0: + # Fallback: если нужных колонок нет в data table, используем только meta keys + select_cols = [sa.column(k) for k in keys_in_meta] + changed_sql = sa.select(*select_cols).select_from(tbl).where( + sa.or_( + tbl.c.update_ts > offset, + sa.and_( + tbl.c.delete_ts.isnot(None), + tbl.c.delete_ts > offset + ) + ) + ) + changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, keys_in_meta, filters_idx) + changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config) + if len(select_cols) > 0: + changed_sql = changed_sql.group_by(*select_cols) + changed_ctes.append(changed_sql.cte(name=f"{inp.dt.name}_changes")) + continue + + # SELECT meta_keys, data_keys FROM meta JOIN data ON primary_keys + # WHERE update_ts > offset OR delete_ts > offset + select_cols = [tbl.c[k] for k in keys_in_meta] + [data_tbl.c[k] for k in keys_in_data_available] + + # Строим JOIN condition по primary keys + if len(inp.dt.primary_keys) == 1: + join_condition = tbl.c[inp.dt.primary_keys[0]] == data_tbl.c[inp.dt.primary_keys[0]] + else: + join_condition = sa.and_(*[ + tbl.c[pk] == data_tbl.c[pk] for pk in inp.dt.primary_keys + ]) + + changed_sql = sa.select(*select_cols).select_from( + tbl.join(data_tbl, join_condition) + ).where( + sa.or_( + tbl.c.update_ts > offset, + sa.and_( + tbl.c.delete_ts.isnot(None), + tbl.c.delete_ts > offset + ) + ) + ) + + # Применить filters_idx и run_config + all_keys = keys_in_meta + keys_in_data_available + changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, all_keys, filters_idx) + changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config) + + if len(select_cols) > 0: + changed_sql = changed_sql.group_by(*select_cols) changed_ctes.append(changed_sql.cte(name=f"{inp.dt.name}_changes")) # 3. Получить записи с ошибками из TransformMetaTable - # Важно: error_records содержат только transform_keys, не additional_columns + # Важно: error_records должен иметь все колонки из all_select_keys для UNION + # Для additional_columns используем NULL, так как их нет в transform meta table tr_tbl = meta_table.sql_table - error_records_sql: Any = sa.select( - *[sa.column(k) for k in transform_keys] - ).select_from(tr_tbl).where( + error_select_cols = [sa.column(k) for k in transform_keys] + [ + sa.literal(None).label(k) for k in additional_columns + ] + error_records_sql: Any = sa.select(*error_select_cols).select_from(tr_tbl).where( sa.or_( tr_tbl.c.is_success != True, # noqa tr_tbl.c.process_ts.is_(None) @@ -947,17 +1031,22 @@ def build_changed_idx_sql_v2( # 4. Объединить все изменения и ошибки через UNION if len(changed_ctes) == 0: # Если нет входных таблиц с изменениями, используем только ошибки - union_sql: Any = sa.select(*[error_records_cte.c[k] for k in transform_keys]).select_from(error_records_cte) + union_sql: Any = sa.select(*[error_records_cte.c[k] for k in all_select_keys]).select_from(error_records_cte) else: # UNION всех изменений и ошибок # Важно: UNION должен включать все колонки из all_select_keys + # Для отсутствующих колонок используем NULL union_parts = [] for cte in changed_ctes: - # Выбираем только те колонки, которые есть в CTE - union_parts.append(sa.select(*[cte.c[k] for k in all_select_keys if k in cte.c]).select_from(cte)) + # Для каждой колонки из all_select_keys: берем из CTE если есть, иначе NULL + select_cols = [ + cte.c[k] if k in cte.c else sa.literal(None).label(k) + for k in all_select_keys + ] + union_parts.append(sa.select(*select_cols).select_from(cte)) union_parts.append( - sa.select(*[error_records_cte.c[k] for k in transform_keys]).select_from(error_records_cte) + sa.select(*[error_records_cte.c[k] for k in all_select_keys]).select_from(error_records_cte) ) union_sql = sa.union(*union_parts) From 95a2341bee4c4c5d7a99394fe03d73b8ae4fef78 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Tue, 28 Oct 2025 16:50:26 +0300 Subject: [PATCH 5/8] [Looky-7769] fix: implement reverse join for reference tables in filtered join optimization --- datapipe/meta/sql_meta.py | 69 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 933371e3..fddc97bb 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -890,7 +890,16 @@ def build_changed_idx_sql_v2( offsets[inp.dt.name] = 0.0 # 2. Построить CTE для каждой входной таблицы с фильтром по offset + # Для таблиц с join_keys нужен обратный JOIN к основной таблице changed_ctes = [] + + # Сначала находим "основную" таблицу - первую без join_keys + primary_inp = None + for inp in input_dts: + if not inp.join_keys: + primary_inp = inp + break + for inp in input_dts: tbl = inp.dt.meta_table.sql_table @@ -904,12 +913,68 @@ def build_changed_idx_sql_v2( offset = offsets[inp.dt.name] + # ОБРАТНЫЙ JOIN для справочных таблиц с join_keys + # Когда изменяется справочная таблица, нужно найти все записи основной таблицы, + # которые на нее ссылаются + if inp.join_keys and primary_inp and hasattr(primary_inp.dt.table_store, 'data_table'): + # Справочная таблица изменилась - нужен обратный JOIN к основной + primary_data_tbl = primary_inp.dt.table_store.data_table + + # Строим SELECT для всех колонок из all_select_keys основной таблицы + primary_data_cols = [c.name for c in primary_data_tbl.columns] + select_cols = [ + primary_data_tbl.c[k] if k in primary_data_cols else sa.literal(None).label(k) + for k in all_select_keys + ] + + # Обратный JOIN: primary_table.join_key = reference_table.id + # Например: posts.user_id = profiles.id + # inp.join_keys = {'user_id': 'id'} означает: + # 'user_id' - колонка в основной таблице (posts) + # 'id' - колонка в справочной таблице (profiles) + join_conditions = [] + for primary_col, ref_col in inp.join_keys.items(): + if primary_col in primary_data_cols and ref_col in meta_cols: + join_conditions.append(primary_data_tbl.c[primary_col] == tbl.c[ref_col]) + + if len(join_conditions) == 0: + # Не можем построить JOIN - пропускаем эту таблицу + continue + + join_condition = sa.and_(*join_conditions) if len(join_conditions) > 1 else join_conditions[0] + + # SELECT primary_cols FROM reference_meta + # JOIN primary_data ON primary.join_key = reference.id + # WHERE reference.update_ts > offset + changed_sql = sa.select(*select_cols).select_from( + tbl.join(primary_data_tbl, join_condition) + ).where( + sa.or_( + tbl.c.update_ts > offset, + sa.and_( + tbl.c.delete_ts.isnot(None), + tbl.c.delete_ts > offset + ) + ) + ) + + # Применить filters и group by + changed_sql = sql_apply_filters_idx_to_subquery(changed_sql, all_select_keys, filters_idx) + # run_config фильтры применяются к справочной таблице + changed_sql = sql_apply_runconfig_filter(changed_sql, tbl, inp.dt.primary_keys, run_config) + + if len(select_cols) > 0: + changed_sql = changed_sql.group_by(*select_cols) + + changed_ctes.append(changed_sql.cte(name=f"{inp.dt.name}_changes")) + continue + # Если все ключи есть в meta table - используем простой запрос if len(keys_in_data_only) == 0: - select_cols: List[Any] = [sa.column(k) for k in keys_in_meta] + select_cols = [sa.column(k) for k in keys_in_meta] # SELECT keys FROM input_meta WHERE update_ts > offset OR delete_ts > offset - changed_sql: Any = sa.select(*select_cols).select_from(tbl).where( + changed_sql = sa.select(*select_cols).select_from(tbl).where( sa.or_( tbl.c.update_ts > offset, sa.and_( From a45b918381739dd6b5246b9f8f93fb844d9f6772 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Tue, 28 Oct 2025 17:04:33 +0300 Subject: [PATCH 6/8] [Looky-7769] fix: add type annotation for error_select_cols in sql_meta --- datapipe/meta/sql_meta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index fddc97bb..bb74a195 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -1074,7 +1074,7 @@ def build_changed_idx_sql_v2( # Важно: error_records должен иметь все колонки из all_select_keys для UNION # Для additional_columns используем NULL, так как их нет в transform meta table tr_tbl = meta_table.sql_table - error_select_cols = [sa.column(k) for k in transform_keys] + [ + error_select_cols: List[Any] = [sa.column(k) for k in transform_keys] + [ sa.literal(None).label(k) for k in additional_columns ] error_records_sql: Any = sa.select(*error_select_cols).select_from(tr_tbl).where( From 183a66a068f187560f6d2e231e861699b4157da1 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Wed, 29 Oct 2025 19:59:09 +0300 Subject: [PATCH 7/8] [Looky-7769] fix: create offsets for JoinSpec tables with join_keys during incremental processing --- datapipe/step/batch_transform.py | 46 +++++++-- tests/test_offset_joinspec.py | 165 +++++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 8 deletions(-) create mode 100644 tests/test_offset_joinspec.py diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 9ad4d768..412a30f1 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -457,7 +457,7 @@ def gen(): def _get_max_update_ts_for_batch( self, ds: DataStore, - input_dt: DataTable, + compute_input: "ComputeInput", processed_idx: IndexDF, ) -> Optional[float]: """ @@ -465,12 +465,16 @@ def _get_max_update_ts_for_batch( Важно: используем processed_idx который содержит только успешно обработанные записи из output_dfs (result.index), а не весь батч idx. + + Для JoinSpec таблиц (с join_keys) используем join_keys для фильтрации вместо primary_keys. + Пример: profiles с join_keys={'user_id': 'id'} фильтруется по profiles.id IN (processed_idx.user_id) """ from datapipe.sql_util import sql_apply_idx_filter_to_table if len(processed_idx) == 0: return None + input_dt = compute_input.dt tbl = input_dt.meta_table.sql_table # Построить запрос с фильтром по processed_idx (только успешно обработанные) @@ -481,13 +485,39 @@ def _get_max_update_ts_for_batch( ) max_ts_expr = func.max(max_of_both) sql = select(max_ts_expr) - # Используем только те ключи, которые есть в processed_idx - idx_keys = list(processed_idx.columns) - filter_keys = [k for k in input_dt.primary_keys if k in idx_keys] - # Если нет общих ключей, не можем отфильтровать - берем максимум по всей таблице - if len(filter_keys) > 0: - sql = sql_apply_idx_filter_to_table(sql, tbl, filter_keys, processed_idx) + # Для JoinSpec таблиц используем join_keys вместо primary_keys + # join_keys: Dict[idx_col -> dt_col], например {'user_id': 'id'} + # Значит: фильтруем dt.id по значениям из processed_idx.user_id + if compute_input.join_keys: + # Для каждого join key создаём фильтр + # Используем только те join keys, у которых idx_col есть в processed_idx + idx_keys = list(processed_idx.columns) + + # Создаём маппинг dt_col -> idx_col для фильтрации + # Пример: {'id': 'user_id'} - фильтровать dt.id по processed_idx.user_id + filter_mapping = {} + for idx_col, dt_col in compute_input.join_keys.items(): + if idx_col in idx_keys: + filter_mapping[dt_col] = idx_col + + if filter_mapping: + # Применяем фильтр используя dt columns как ключи + # Но берем значения из соответствующих idx columns + filter_keys = list(filter_mapping.keys()) + + # Создаём переименованный processed_idx для sql_apply_idx_filter_to_table + # Например: user_id -> id чтобы фильтровать по dt.id + renamed_idx = processed_idx.rename(columns={v: k for k, v in filter_mapping.items()}) + sql = sql_apply_idx_filter_to_table(sql, tbl, filter_keys, renamed_idx) + else: + # Для обычных таблиц используем primary_keys + idx_keys = list(processed_idx.columns) + filter_keys = [k for k in input_dt.primary_keys if k in idx_keys] + + # Если нет общих ключей, не можем отфильтровать - берем максимум по всей таблице + if len(filter_keys) > 0: + sql = sql_apply_idx_filter_to_table(sql, tbl, filter_keys, processed_idx) with ds.meta_dbconn.con.begin() as con: result = con.execute(sql).scalar() @@ -556,7 +586,7 @@ def store_batch_result( for inp in self.input_dts: # Найти максимальный update_ts из УСПЕШНО обработанного батча - max_update_ts = self._get_max_update_ts_for_batch(ds, inp.dt, processed_idx) + max_update_ts = self._get_max_update_ts_for_batch(ds, inp, processed_idx) if max_update_ts is not None: offsets_to_update[(self.get_name(), inp.dt.name)] = max_update_ts diff --git a/tests/test_offset_joinspec.py b/tests/test_offset_joinspec.py new file mode 100644 index 00000000..0b6dc857 --- /dev/null +++ b/tests/test_offset_joinspec.py @@ -0,0 +1,165 @@ +""" +Тест для проверки что offset'ы создаются для JoinSpec таблиц (с join_keys). + +Воспроизводит баг где offset создавался только для главной таблицы (posts), +но не для справочной таблицы (profiles) с join_keys. +""" + +import time + +import pandas as pd +from sqlalchemy import Column, Integer, String + +from datapipe.compute import ComputeInput +from datapipe.datatable import DataStore +from datapipe.step.batch_transform import BatchTransformStep +from datapipe.store.database import DBConn, TableStoreDB + + +def test_offset_created_for_joinspec_tables(dbconn: DBConn): + """ + Проверяет что offset создается для таблиц с join_keys (JoinSpec). + + Сценарий: + 1. Создаём posts и profiles (profiles с join_keys={'user_id': 'id'}) + 2. Запускаем трансформацию с offset optimization + 3. Проверяем что offset создан ДЛЯ ОБЕИХ таблиц: posts И profiles + """ + ds = DataStore(dbconn, create_meta_table=True) + + # 1. Создать posts таблицу (используем String для id чтобы совпадать с мета-таблицей) + posts_store = TableStoreDB( + dbconn, + "posts", + [ + Column("id", String, primary_key=True), + Column("user_id", String), + Column("content", String), + ], + create_table=True, + ) + posts = ds.create_table("posts", posts_store) + + # 2. Создать profiles таблицу (справочник) + profiles_store = TableStoreDB( + dbconn, + "profiles", + [Column("id", String, primary_key=True), Column("username", String)], + create_table=True, + ) + profiles = ds.create_table("profiles", profiles_store) + + # 3. Создать output таблицу (id - primary key, остальное - данные) + output_store = TableStoreDB( + dbconn, + "posts_with_username", + [ + Column("id", String, primary_key=True), + Column("user_id", String), # Обычная колонка, не primary key + Column("content", String), + Column("username", String), + ], + create_table=True, + ) + output_dt = ds.create_table("posts_with_username", output_store) + + # 4. Добавить данные + process_ts = time.time() + + # 3 поста от 2 пользователей + posts_df = pd.DataFrame([ + {"id": "1", "user_id": "1", "content": "Post 1"}, + {"id": "2", "user_id": "1", "content": "Post 2"}, + {"id": "3", "user_id": "2", "content": "Post 3"}, + ]) + posts.store_chunk(posts_df, now=process_ts) + + # 2 профиля + profiles_df = pd.DataFrame([ + {"id": "1", "username": "alice"}, + {"id": "2", "username": "bob"}, + ]) + profiles.store_chunk(profiles_df, now=process_ts) + + # 5. Создать трансформацию с join_keys + def transform_func(posts_df, profiles_df): + # JOIN posts + profiles + result = posts_df.merge(profiles_df, left_on="user_id", right_on="id", suffixes=("", "_profile")) + return result[["id", "user_id", "content", "username"]] + + step = BatchTransformStep( + ds=ds, + name="test_transform", + func=transform_func, + input_dts=[ + ComputeInput(dt=posts, join_type="full"), # Главная таблица + ComputeInput(dt=profiles, join_type="inner", join_keys={"user_id": "id"}), # JoinSpec таблица + ], + output_dts=[output_dt], + transform_keys=["id"], # Primary key первой таблицы (posts) + use_offset_optimization=True, # ВАЖНО: используем offset optimization + ) + + # 6. Запустить трансформацию + print("\n🚀 Running initial transformation...") + step.run_full(ds) + + # Проверяем результаты трансформации + output_data = output_dt.get_data() + print(f"✅ Output rows created: {len(output_data)}") + print(f"Output data:\n{output_data}") + + # 7. Проверить что offset'ы созданы для ОБЕИХ таблиц + print("\n🔍 Checking offsets...") + # Используем step.get_name() чтобы получить имя с хэшем + transform_name = step.get_name() + print(f"🔑 Transform name with hash: {transform_name}") + offsets = ds.offset_table.get_offsets_for_transformation(transform_name) + + print(f"📊 Offsets created: {offsets}") + + # КРИТИЧЕСКИ ВАЖНО: offset должен быть для posts И для profiles! + assert "posts" in offsets, "Offset for 'posts' table not found!" + assert "profiles" in offsets, "Offset for 'profiles' table not found! (БАГ!)" + + # Оба offset'а должны быть >= process_ts + assert offsets["posts"] >= process_ts, f"posts offset {offsets['posts']} < process_ts {process_ts}" + assert offsets["profiles"] >= process_ts, f"profiles offset {offsets['profiles']} < process_ts {process_ts}" + + # Проверяем что были созданы 3 записи в output + output_data = output_dt.get_data() + assert len(output_data) == 3, f"Expected 3 output rows, got {len(output_data)}" + + # 8. Добавим новые данные и проверим инкрементальную обработку + time.sleep(0.01) # Небольшая задержка для различения timestamp'ов + process_ts2 = time.time() + + # Добавляем 1 новый пост + new_posts_df = pd.DataFrame([ + {"id": "4", "user_id": "1", "content": "New Post 4"}, + ]) + posts.store_chunk(new_posts_df, now=process_ts2) + + # Добавляем 1 новый профиль + new_profiles_df = pd.DataFrame([ + {"id": "3", "username": "charlie"}, + ]) + profiles.store_chunk(new_profiles_df, now=process_ts2) + + # 9. Запускаем инкрементальную обработку + step.run_full(ds) + + # 10. Проверяем что offset'ы обновились + new_offsets = ds.offset_table.get_offsets_for_transformation(transform_name) + + print(f"\n📊 New offsets after incremental run: {new_offsets}") + + # Оба offset'а должны обновиться до process_ts2 + assert new_offsets["posts"] >= process_ts2, f"posts offset not updated: {new_offsets['posts']} < {process_ts2}" + assert new_offsets["profiles"] >= process_ts2, f"profiles offset not updated: {new_offsets['profiles']} < {process_ts2}" + + # Проверяем что теперь 4 записи в output (3 старых + 1 новый пост) + output_data = output_dt.get_data() + assert len(output_data) == 4, f"Expected 4 output rows, got {len(output_data)}" + + print("\n✅ SUCCESS: Offsets created and updated for both posts AND profiles (including JoinSpec table)!") From f794999c3258e16104a144c9acefec230cf390d6 Mon Sep 17 00:00:00 2001 From: Dmitriy Vinogradov Date: Wed, 29 Oct 2025 20:28:01 +0300 Subject: [PATCH 8/8] [Looky-7769] feat: add test for three-table filtered join with v1 vs v2 comparison --- tests/test_multi_table_filtered_join.py | 254 ++++++++++++++++++++++++ 1 file changed, 254 insertions(+) diff --git a/tests/test_multi_table_filtered_join.py b/tests/test_multi_table_filtered_join.py index 8202f898..1c8b41d3 100644 --- a/tests/test_multi_table_filtered_join.py +++ b/tests/test_multi_table_filtered_join.py @@ -455,3 +455,257 @@ def transform_func(df_a, df_b): # Проверяем что добавилась новая запись assert len(result_v1) == 5 assert "5" in result_v1["id"].values + + +def test_three_tables_filtered_join(dbconn: DBConn): + """ + Тест 4: Проверяет работу filtered join с ТРЕМЯ таблицами. + + Сценарий: + - Основная таблица posts с post_id, user_id, category_id + - Справочник users с user_id + - Справочник categories с category_id + - join_keys для обоих справочников + - Проверяем что все три таблицы корректно джойнятся и filtered join работает + """ + ds = DataStore(dbconn, create_meta_table=True) + + # Основная таблица: посты + posts_store = TableStoreDB( + dbconn, + "posts", + [ + Column("id", String, primary_key=True), + Column("user_id", String), + Column("category_id", String), + Column("content", String), + ], + create_table=True, + ) + posts_dt = ds.create_table("posts", posts_store) + + # Справочник 1: пользователи + users_store = TableStoreDB( + dbconn, + "users", + [ + Column("id", String, primary_key=True), + Column("username", String), + ], + create_table=True, + ) + users_dt = ds.create_table("users", users_store) + + # Справочник 2: категории + categories_store = TableStoreDB( + dbconn, + "categories", + [ + Column("id", String, primary_key=True), + Column("category_name", String), + ], + create_table=True, + ) + categories_dt = ds.create_table("categories", categories_store) + + # Выходная таблица + output_store = TableStoreDB( + dbconn, + "enriched_posts", + [ + Column("id", String, primary_key=True), + Column("content", String), + Column("username", String), + Column("category_name", String), + ], + create_table=True, + ) + output_dt = ds.create_table("enriched_posts", output_store) + + # Отслеживание вызовов get_data для проверки filtered join + users_get_data_calls = [] + categories_get_data_calls = [] + + original_users_get_data = users_dt.get_data + original_categories_get_data = categories_dt.get_data + + def tracked_users_get_data(idx=None, **kwargs): + if idx is not None: + users_get_data_calls.append({ + "idx_columns": list(idx.columns), + "idx_values": sorted(idx["id"].tolist()) if "id" in idx.columns else [], + "idx_length": len(idx) + }) + return original_users_get_data(idx=idx, **kwargs) + + def tracked_categories_get_data(idx=None, **kwargs): + if idx is not None: + categories_get_data_calls.append({ + "idx_columns": list(idx.columns), + "idx_values": sorted(idx["id"].tolist()) if "id" in idx.columns else [], + "idx_length": len(idx) + }) + return original_categories_get_data(idx=idx, **kwargs) + + users_dt.get_data = tracked_users_get_data # type: ignore[method-assign] + categories_dt.get_data = tracked_categories_get_data # type: ignore[method-assign] + + # Функция трансформации: join всех трёх таблиц + def transform_func(posts_df, users_df, categories_df): + # Join с users + result = posts_df.merge( + users_df, + left_on="user_id", + right_on="id", + how="left", + suffixes=("", "_user") + ) + # Join с categories + result = result.merge( + categories_df, + left_on="category_id", + right_on="id", + how="left", + suffixes=("", "_cat") + ) + return result[["id", "content", "username", "category_name"]] + + # Step с двумя filtered joins + # join_keys работает только с v2 (offset optimization) + step = BatchTransformStep( + ds=ds, + name="test_three_tables", + func=transform_func, + input_dts=[ + ComputeInput(dt=posts_dt, join_type="full"), # Основная таблица + ComputeInput( + dt=users_dt, + join_type="full", + join_keys={"user_id": "id"} # Filtered join: posts.user_id -> users.id + ), + ComputeInput( + dt=categories_dt, + join_type="full", + join_keys={"category_id": "id"} # Filtered join: posts.category_id -> categories.id + ), + ], + output_dts=[output_dt], + transform_keys=["id"], + use_offset_optimization=True, # join_keys требует v2 + ) + + # Добавляем данные + now = time.time() + + # 3 поста от 2 пользователей в 2 категориях + posts_dt.store_chunk( + pd.DataFrame({ + "id": ["p1", "p2", "p3"], + "user_id": ["u1", "u2", "u1"], + "category_id": ["cat1", "cat2", "cat1"], + "content": ["Post 1", "Post 2", "Post 3"] + }), + now=now + ) + + # 100 пользователей (но только u1, u2 используются в постах) + users_data = [{"id": f"u{i}", "username": f"User{i}"} for i in range(3, 100)] + users_data.extend([ + {"id": "u1", "username": "Alice"}, + {"id": "u2", "username": "Bob"}, + ]) + users_dt.store_chunk(pd.DataFrame(users_data), now=now) + + # 50 категорий (но только cat1, cat2 используются в постах) + categories_data = [{"id": f"cat{i}", "category_name": f"Category {i}"} for i in range(3, 50)] + categories_data.extend([ + {"id": "cat1", "category_name": "Tech"}, + {"id": "cat2", "category_name": "News"}, + ]) + categories_dt.store_chunk(pd.DataFrame(categories_data), now=now) + + # Запускаем трансформацию + step.run_full(ds) + + # ПРОВЕРКА 1: Filtered join для users должен читать только u1, u2 + assert len(users_get_data_calls) > 0, "users get_data should be called with filtered idx" + users_call = users_get_data_calls[0] + assert "id" in users_call["idx_columns"] + assert users_call["idx_length"] == 2, f"Should filter to 2 users, got {users_call['idx_length']}" + assert sorted(users_call["idx_values"]) == ["u1", "u2"], ( + f"Should filter users to u1, u2, got {users_call['idx_values']}" + ) + + # ПРОВЕРКА 2: Filtered join для categories должен читать только cat1, cat2 + assert len(categories_get_data_calls) > 0, "categories get_data should be called with filtered idx" + categories_call = categories_get_data_calls[0] + assert "id" in categories_call["idx_columns"] + assert categories_call["idx_length"] == 2, ( + f"Should filter to 2 categories, got {categories_call['idx_length']}" + ) + assert sorted(categories_call["idx_values"]) == ["cat1", "cat2"], ( + f"Should filter categories to cat1, cat2, got {categories_call['idx_values']}" + ) + + # ПРОВЕРКА 3: Результат должен содержать правильные данные + output_data = output_dt.get_data().sort_values("id").reset_index(drop=True) + assert len(output_data) == 3 + + expected = pd.DataFrame({ + "id": ["p1", "p2", "p3"], + "content": ["Post 1", "Post 2", "Post 3"], + "username": ["Alice", "Bob", "Alice"], + "category_name": ["Tech", "News", "Tech"] + }) + pd.testing.assert_frame_equal(output_data, expected) + + print("\n✅ Three tables filtered join test passed!") + print(f" - Posts: 3 records") + print(f" - Users: filtered from {len(users_data)} to 2 records (u1, u2)") + print(f" - Categories: filtered from {len(categories_data)} to 2 records (cat1, cat2)") + print(f" - Output: 3 enriched posts with correct joins") + + # === ДОПОЛНИТЕЛЬНАЯ ПРОВЕРКА: сравнение v1 vs v2 === + # Создаём отдельную выходную таблицу для v1 + output_v1_store = TableStoreDB( + dbconn, + "enriched_posts_v1", + [ + Column("id", String, primary_key=True), + Column("content", String), + Column("username", String), + Column("category_name", String), + ], + create_table=True, + ) + output_v1_dt = ds.create_table("enriched_posts_v1", output_v1_store) + + # Step v1 БЕЗ join_keys (обычный FULL OUTER JOIN всех таблиц) + step_v1 = BatchTransformStep( + ds=ds, + name="test_three_tables_v1", + func=transform_func, + input_dts=[ + ComputeInput(dt=posts_dt, join_type="full"), + ComputeInput(dt=users_dt, join_type="full"), # БЕЗ join_keys + ComputeInput(dt=categories_dt, join_type="full"), # БЕЗ join_keys + ], + output_dts=[output_v1_dt], + transform_keys=["id"], + use_offset_optimization=False, # v1 + ) + + # Запускаем v1 + step_v1.run_full(ds) + + # Сравниваем результаты v1 и v2 + result_v1 = output_v1_dt.get_data().sort_values("id").reset_index(drop=True) + result_v2 = output_data # Уже отсортирован + + # КЛЮЧЕВАЯ ПРОВЕРКА: результаты v1 и v2 должны быть идентичны + pd.testing.assert_frame_equal(result_v1, result_v2) + + print("\n✅ V1 vs V2 comparison PASSED!") + print(f" - V1 (FULL OUTER JOIN): {len(result_v1)} rows") + print(f" - V2 (offset + filtered join): {len(result_v2)} rows") + print(f" - Results are identical ✓")