From 624d833c892b8ff8f7001994feaa6a5f44c68bab Mon Sep 17 00:00:00 2001 From: Roderick Dunn Date: Fri, 17 Feb 2023 16:00:50 -0500 Subject: [PATCH 1/4] Range query no longer filters on min/max key range --- data_diff/table_segment.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/data_diff/table_segment.py b/data_diff/table_segment.py index 725beb39..1d7025b2 100644 --- a/data_diff/table_segment.py +++ b/data_diff/table_segment.py @@ -104,10 +104,15 @@ def _make_update_range(self): def source_table(self): return table(*self.table_path, schema=self._schema) - def make_select(self): - return self.source_table.where( - *self._make_key_range(), *self._make_update_range(), Code(self._where()) if self.where else SKIP - ) + def make_select(self, include_key_range=True): + if include_key_range: + return self.source_table.where( + *self._make_key_range(), *self._make_update_range(), Code(self._where()) if self.where else SKIP + ) + else: + return self.source_table.where( + *self._make_update_range(), Code(self._where()) if self.where else SKIP + ) def get_values(self) -> list: "Download all the relevant values of the segment from the database" @@ -187,7 +192,7 @@ def query_key_range(self) -> Tuple[int, int]: """Query database for minimum and maximum key. This is used for setting the initial bounds.""" # Normalizes the result (needed for UUIDs) after the min/max computation (k,) = self.key_columns - select = self.make_select().select( + select = self.make_select(include_key_range=False).select( ApplyFuncAndNormalizeAsString(this[k], min_), ApplyFuncAndNormalizeAsString(this[k], max_), ) From ef8494c43353b94d3e2a521ceced06e478c6fc03 Mon Sep 17 00:00:00 2001 From: Roderick Dunn Date: Fri, 24 Feb 2023 17:23:31 -0500 Subject: [PATCH 2/4] Bypass key range query if min/max provided --- data_diff/diff_tables.py | 21 +++++++++++++++++---- data_diff/table_segment.py | 18 +++++++++++++++++- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/data_diff/diff_tables.py b/data_diff/diff_tables.py index 3cd90360..c44fc58b 100644 --- a/data_diff/diff_tables.py +++ b/data_diff/diff_tables.py @@ -60,9 +60,9 @@ def _thread_as_completed(self, func, iterable): for future in as_completed(futures): yield future.result() - def _threaded_call_as_completed(self, func, iterable): + def _threaded_call_as_completed(self, func, iterable, *args, **kwargs): "Calls a method for each object in iterable. Returned in order of completion." - return self._thread_as_completed(methodcaller(func), iterable) + return self._thread_as_completed(methodcaller(func, *args, **kwargs), iterable) @contextmanager def _run_in_background(self, *funcs): @@ -290,8 +290,21 @@ def _bisect_and_diff_tables(self, table1, table2, info_tree): if key_type.python_type is not key_type2.python_type: raise TypeError(f"Incompatible key types: {key_type} and {key_type2}") - # Query min/max values - key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2]) + if all([table1.min_key, table1.max_key, table2.min_key, table2.max_key]): + key_ranges = (kr for kr in [(table1.min_key, table1.max_key), (table2.min_key, table2.max_key)]) + elif table1.min_key and table2.min_key: + logger.debug('Querying for max_key') + max_keys = self._threaded_call_as_completed("query_key_bound", [table1, table2], bound='max') + min_keys = (table1.min_key, table2.min_key) + key_ranges = zip(min_keys, max_keys) + elif table1.max_key and table2.max_key: + logger.debug('Querying for min_key') + min_keys = self._threaded_call_as_completed("query_key_bound", [table1, table2], bound='min') + max_keys = (table1.max_key, table2.max_key) + key_ranges = zip(min_keys, max_keys) + else: + # Query min/max values + key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2]) # Start with the first completed value, so we don't waste time waiting min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges)) diff --git a/data_diff/table_segment.py b/data_diff/table_segment.py index 1d7025b2..33e6cc9a 100644 --- a/data_diff/table_segment.py +++ b/data_diff/table_segment.py @@ -1,5 +1,5 @@ import time -from typing import List, Tuple +from typing import List, Tuple, Literal import logging from runtype import dataclass @@ -203,6 +203,22 @@ def query_key_range(self) -> Tuple[int, int]: return min_key, max_key + def query_key_bound(self, bound: Literal['min', 'max']) -> int: + """Query database for min OR max of key. This is used for setting the initial bounds.""" + # Normalizes the result (needed for UUIDs) after the min computation + + bound_expr = min_ if bound == 'min' else max_ + (k,) = self.key_columns + select = self.make_select(include_key_range=False).select( + ApplyFuncAndNormalizeAsString(this[k], bound_expr) + ) + min_or_max_key = self.database.query(select, str) + + if min_or_max_key is None: + raise ValueError("Table appears to be empty") + + return min_or_max_key[0][0] + @property def is_bounded(self): return self.min_key is not None and self.max_key is not None From 1cfc413b165ab5d879d8d8222a41997d7745e4a3 Mon Sep 17 00:00:00 2001 From: Roderick Dunn Date: Fri, 24 Feb 2023 17:23:47 -0500 Subject: [PATCH 3/4] tests --- tests/test_diff_tables.py | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/test_diff_tables.py b/tests/test_diff_tables.py index 2a8d2acc..ae9d478f 100644 --- a/tests/test_diff_tables.py +++ b/tests/test_diff_tables.py @@ -2,6 +2,7 @@ from typing import Callable import uuid import unittest +from unittest.mock import patch from sqeleton.queries import table, this, commit from sqeleton.utils import ArithAlphanumeric, numberToAlphanum @@ -299,6 +300,50 @@ def test_diff_sorted_by_key(self): } self.assertEqual(expected, diff) + @patch.object(TableSegment, 'query_key_range') + @patch.object(TableSegment, 'query_key_bound') + def test_key_bounds(self, mock_query_key_bound, mock_query_key_range): + # test range query when no min/max provided + mock_query_key_range.return_value = (0, 10) + _ = list(self.differ.diff_tables(self.table, self.table2)) + mock_query_key_range.assert_called() + mock_query_key_bound.assert_not_called() + + # test no range or bounds query + mock_query_key_range.reset_mock() + mock_query_key_bound.reset_mock() + + tbl1 = self.table.replace(min_key=1, max_key=100) + tbl2 = self.table2.replace(min_key=1, max_key=100) + + _ = list(self.differ.diff_tables(tbl1, tbl2)) + mock_query_key_range.assert_not_called() + mock_query_key_bound.assert_not_called() + + # test query min only + mock_query_key_range.reset_mock() + mock_query_key_bound.reset_mock() + + tbl1 = self.table.replace(max_key=100) + tbl2 = self.table2.replace(max_key=100) + + _ = list(self.differ.diff_tables(tbl1, tbl2)) + mock_query_key_range.assert_not_called() + mock_query_key_bound.assert_called_with(bound='min') + + # test query max only + mock_query_key_range.reset_mock() + mock_query_key_bound.reset_mock() + mock_query_key_bound.return_value = 1000 + + tbl1 = self.table.replace(max_key=None, min_key=100) + tbl2 = self.table2.replace(max_key=None, min_key=100) + + _ = list(self.differ.diff_tables(tbl1, tbl2)) + mock_query_key_range.assert_not_called() + mock_query_key_bound.assert_called_with(bound='max') + + @test_each_database class TestDiffTables2(DiffTestCase): From 6b2d3c35e3cc8744a8749839f0c539c07739dbd2 Mon Sep 17 00:00:00 2001 From: Roderick Dunn Date: Mon, 27 Feb 2023 10:34:06 -0500 Subject: [PATCH 4/4] Simplied logic --- data_diff/diff_tables.py | 29 ++++++++++++++--------------- data_diff/table_segment.py | 18 +----------------- tests/test_diff_tables.py | 30 +++++++----------------------- 3 files changed, 22 insertions(+), 55 deletions(-) diff --git a/data_diff/diff_tables.py b/data_diff/diff_tables.py index c44fc58b..af250afa 100644 --- a/data_diff/diff_tables.py +++ b/data_diff/diff_tables.py @@ -60,9 +60,9 @@ def _thread_as_completed(self, func, iterable): for future in as_completed(futures): yield future.result() - def _threaded_call_as_completed(self, func, iterable, *args, **kwargs): + def _threaded_call_as_completed(self, func, iterable): "Calls a method for each object in iterable. Returned in order of completion." - return self._thread_as_completed(methodcaller(func, *args, **kwargs), iterable) + return self._thread_as_completed(methodcaller(func), iterable) @contextmanager def _run_in_background(self, *funcs): @@ -271,6 +271,14 @@ def _diff_segments( ): ... + def _resolve_key_range(self, key_range_res, usr_key_range): + key_range_res = list(key_range_res) + if usr_key_range[0] is not None: + key_range_res[0] = usr_key_range[0] + if usr_key_range[1] is not None: + key_range_res[1] = usr_key_range[1] + return tuple(key_range_res) + def _bisect_and_diff_tables(self, table1, table2, info_tree): if len(table1.key_columns) > 1: raise NotImplementedError("Composite key not supported yet!") @@ -290,24 +298,15 @@ def _bisect_and_diff_tables(self, table1, table2, info_tree): if key_type.python_type is not key_type2.python_type: raise TypeError(f"Incompatible key types: {key_type} and {key_type2}") - if all([table1.min_key, table1.max_key, table2.min_key, table2.max_key]): + usr_key_range = (table1.min_key, table1.max_key) + if all(k is not None for k in [table1.min_key, table1.max_key, table2.min_key, table2.max_key]): key_ranges = (kr for kr in [(table1.min_key, table1.max_key), (table2.min_key, table2.max_key)]) - elif table1.min_key and table2.min_key: - logger.debug('Querying for max_key') - max_keys = self._threaded_call_as_completed("query_key_bound", [table1, table2], bound='max') - min_keys = (table1.min_key, table2.min_key) - key_ranges = zip(min_keys, max_keys) - elif table1.max_key and table2.max_key: - logger.debug('Querying for min_key') - min_keys = self._threaded_call_as_completed("query_key_bound", [table1, table2], bound='min') - max_keys = (table1.max_key, table2.max_key) - key_ranges = zip(min_keys, max_keys) else: # Query min/max values key_ranges = self._threaded_call_as_completed("query_key_range", [table1, table2]) # Start with the first completed value, so we don't waste time waiting - min_key1, max_key1 = self._parse_key_range_result(key_type, next(key_ranges)) + min_key1, max_key1 = self._parse_key_range_result(key_type, self._resolve_key_range(next(key_ranges), usr_key_range)) table1, table2 = [t.new(min_key=min_key1, max_key=max_key1) for t in (table1, table2)] @@ -321,7 +320,7 @@ def _bisect_and_diff_tables(self, table1, table2, info_tree): ti.submit(self._bisect_and_diff_segments, ti, table1, table2, info_tree) # Now we check for the second min-max, to diff the portions we "missed". - min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges)) + min_key2, max_key2 = self._parse_key_range_result(key_type, self._resolve_key_range(next(key_ranges), usr_key_range)) if min_key2 < min_key1: pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)] diff --git a/data_diff/table_segment.py b/data_diff/table_segment.py index 33e6cc9a..1d7025b2 100644 --- a/data_diff/table_segment.py +++ b/data_diff/table_segment.py @@ -1,5 +1,5 @@ import time -from typing import List, Tuple, Literal +from typing import List, Tuple import logging from runtype import dataclass @@ -203,22 +203,6 @@ def query_key_range(self) -> Tuple[int, int]: return min_key, max_key - def query_key_bound(self, bound: Literal['min', 'max']) -> int: - """Query database for min OR max of key. This is used for setting the initial bounds.""" - # Normalizes the result (needed for UUIDs) after the min computation - - bound_expr = min_ if bound == 'min' else max_ - (k,) = self.key_columns - select = self.make_select(include_key_range=False).select( - ApplyFuncAndNormalizeAsString(this[k], bound_expr) - ) - min_or_max_key = self.database.query(select, str) - - if min_or_max_key is None: - raise ValueError("Table appears to be empty") - - return min_or_max_key[0][0] - @property def is_bounded(self): return self.min_key is not None and self.max_key is not None diff --git a/tests/test_diff_tables.py b/tests/test_diff_tables.py index ae9d478f..f6b55b57 100644 --- a/tests/test_diff_tables.py +++ b/tests/test_diff_tables.py @@ -301,48 +301,32 @@ def test_diff_sorted_by_key(self): self.assertEqual(expected, diff) @patch.object(TableSegment, 'query_key_range') - @patch.object(TableSegment, 'query_key_bound') - def test_key_bounds(self, mock_query_key_bound, mock_query_key_range): + def test_key_bounds(self, mock_query_key_range): # test range query when no min/max provided mock_query_key_range.return_value = (0, 10) _ = list(self.differ.diff_tables(self.table, self.table2)) mock_query_key_range.assert_called() - mock_query_key_bound.assert_not_called() - # test no range or bounds query + # test no range query mock_query_key_range.reset_mock() - mock_query_key_bound.reset_mock() - tbl1 = self.table.replace(min_key=1, max_key=100) tbl2 = self.table2.replace(min_key=1, max_key=100) - _ = list(self.differ.diff_tables(tbl1, tbl2)) mock_query_key_range.assert_not_called() - mock_query_key_bound.assert_not_called() # test query min only mock_query_key_range.reset_mock() - mock_query_key_bound.reset_mock() - tbl1 = self.table.replace(max_key=100) tbl2 = self.table2.replace(max_key=100) - _ = list(self.differ.diff_tables(tbl1, tbl2)) - mock_query_key_range.assert_not_called() - mock_query_key_bound.assert_called_with(bound='min') + mock_query_key_range.assert_called() - # test query max only + # test query min only mock_query_key_range.reset_mock() - mock_query_key_bound.reset_mock() - mock_query_key_bound.return_value = 1000 - - tbl1 = self.table.replace(max_key=None, min_key=100) - tbl2 = self.table2.replace(max_key=None, min_key=100) - + tbl1 = self.table.replace(min_key=0) + tbl2 = self.table2.replace(min_key=0) _ = list(self.differ.diff_tables(tbl1, tbl2)) - mock_query_key_range.assert_not_called() - mock_query_key_bound.assert_called_with(bound='max') - + mock_query_key_range.assert_called() @test_each_database