Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 243997a

Browse files
authored
Merge pull request #822 from datafold/ignored-columns-at-runtime
Ignore columns at runtime on request (e.g. with too many diffs in them)
2 parents 986c861 + d5d227f commit 243997a

3 files changed

Lines changed: 85 additions & 23 deletions

File tree

data_diff/diff_tables.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
"""Provides classes for performing a table diff
22
"""
3-
3+
import threading
44
import time
55
from abc import ABC, abstractmethod
66
from enum import Enum
77
from contextlib import contextmanager
88
from operator import methodcaller
9-
from typing import Dict, Tuple, Iterator, Optional
9+
from typing import Dict, Set, Tuple, Iterator, Optional
1010
from concurrent.futures import ThreadPoolExecutor, as_completed
1111

1212
import attrs
@@ -184,6 +184,10 @@ class TableDiffer(ThreadBase, ABC):
184184
bisection_factor = 32
185185
stats: dict = {}
186186

187+
ignored_columns1: Set[str] = attrs.field(factory=set)
188+
ignored_columns2: Set[str] = attrs.field(factory=set)
189+
_ignored_columns_lock: threading.Lock = attrs.field(factory=threading.Lock, init=False)
190+
187191
def diff_tables(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree = None) -> DiffResultWrapper:
188192
"""Diff the given tables.
189193
@@ -353,6 +357,11 @@ def _bisect_and_diff_segments(
353357
biggest_table = max(table1, table2, key=methodcaller("approximate_size"))
354358
checkpoints = biggest_table.choose_checkpoints(self.bisection_factor - 1)
355359

360+
# Get it thread-safe, to avoid segment misalignment because of bad timing.
361+
with self._ignored_columns_lock:
362+
table1 = attrs.evolve(table1, ignored_columns=frozenset(self.ignored_columns1))
363+
table2 = attrs.evolve(table2, ignored_columns=frozenset(self.ignored_columns2))
364+
356365
# Create new instances of TableSegment between each checkpoint
357366
segmented1 = table1.segment_by_checkpoints(checkpoints)
358367
segmented2 = table2.segment_by_checkpoints(checkpoints)
@@ -363,3 +372,24 @@ def _bisect_and_diff_segments(
363372
ti.submit(
364373
self._diff_segments, ti, t1, t2, info_node, max_rows, level + 1, i + 1, len(segmented1), priority=level
365374
)
375+
376+
def ignore_column(self, column_name1: str, column_name2: str) -> None:
377+
"""
378+
Ignore the column (by name on sides A & B) in md5s & diffs from now on.
379+
380+
This affects 2 places:
381+
382+
- The columns are not checksumed for new(!) segments.
383+
- The columns are ignored in in-memory diffing for running segments.
384+
385+
The columns are never ignored in the fetched values, whether they are
386+
the same or different — for data consistency.
387+
388+
Use this feature to collect relatively well-represented differences
389+
across all columns if one of them is highly different in the beginning
390+
of a table (as per the order of segmentation/bisection). Otherwise,
391+
that one column might easily hit the limit and stop the whole diff.
392+
"""
393+
with self._ignored_columns_lock:
394+
self.ignored_columns1.add(column_name1)
395+
self.ignored_columns2.add(column_name2)

data_diff/hashdiff_tables.py

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
from numbers import Number
33
import logging
44
from collections import defaultdict
5-
from typing import Iterator
5+
from typing import Any, Collection, Dict, Iterator, List, Sequence, Set, Tuple
66

77
import attrs
8+
from typing_extensions import Literal
89

910
from data_diff.abcs.database_types import ColType_UUID, NumericType, PrecisionType, StringType, Boolean, JSON
1011
from data_diff.info_tree import InfoTree
@@ -20,25 +21,42 @@
2021

2122
logger = logging.getLogger("hashdiff_tables")
2223

23-
24-
def diff_sets(a: list, b: list, json_cols: dict = None) -> Iterator:
25-
sa = set(a)
26-
sb = set(b)
24+
# Just for local readability: TODO: later switch to real type declarations of these.
25+
_Op = Literal["+", "-"]
26+
_PK = Any
27+
_Row = Tuple[Any]
28+
29+
30+
def diff_sets(
31+
a: Sequence[_Row],
32+
b: Sequence[_Row],
33+
*,
34+
json_cols: dict = None,
35+
columns1: Sequence[str],
36+
columns2: Sequence[str],
37+
ignored_columns1: Collection[str],
38+
ignored_columns2: Collection[str],
39+
) -> Iterator:
40+
# Differ only by columns of interest (PKs+relevant-ignored). But yield with ignored ones!
41+
sa: Set[_Row] = {tuple(val for col, val in safezip(columns1, row) if col not in ignored_columns1) for row in a}
42+
sb: Set[_Row] = {tuple(val for col, val in safezip(columns2, row) if col not in ignored_columns2) for row in b}
2743

2844
# The first item is always the key (see TableDiffer.relevant_columns)
2945
# TODO update when we add compound keys to hashdiff
30-
d = defaultdict(list)
46+
diffs_by_pks: Dict[_PK, List[Tuple[_Op, _Row]]] = defaultdict(list)
3147
for row in a:
32-
if row not in sb:
33-
d[row[0]].append(("-", row))
48+
cutrow: _Row = tuple(val for col, val in zip(columns1, row) if col not in ignored_columns1)
49+
if cutrow not in sb:
50+
diffs_by_pks[row[0]].append(("-", row))
3451
for row in b:
35-
if row not in sa:
36-
d[row[0]].append(("+", row))
52+
cutrow: _Row = tuple(val for col, val in zip(columns2, row) if col not in ignored_columns2)
53+
if cutrow not in sa:
54+
diffs_by_pks[row[0]].append(("+", row))
3755

3856
warned_diff_cols = set()
39-
for _k, v in sorted(d.items(), key=lambda i: i[0]):
57+
for diffs in (diffs_by_pks[pk] for pk in sorted(diffs_by_pks)):
4058
if json_cols:
41-
parsed_match, overriden_diff_cols = diffs_are_equiv_jsons(v, json_cols)
59+
parsed_match, overriden_diff_cols = diffs_are_equiv_jsons(diffs, json_cols)
4260
if parsed_match:
4361
to_warn = overriden_diff_cols - warned_diff_cols
4462
for w in to_warn:
@@ -48,7 +66,7 @@ def diff_sets(a: list, b: list, json_cols: dict = None) -> Iterator:
4866
)
4967
warned_diff_cols.add(w)
5068
continue
51-
yield from v
69+
yield from diffs
5270

5371

5472
@attrs.define(frozen=False)
@@ -201,7 +219,17 @@ def _bisect_and_diff_segments(
201219
for i, colname in enumerate(table1.extra_columns)
202220
if isinstance(table1._schema[colname], JSON)
203221
}
204-
diff = list(diff_sets(rows1, rows2, json_cols))
222+
diff = list(
223+
diff_sets(
224+
rows1,
225+
rows2,
226+
json_cols=json_cols,
227+
columns1=table1.relevant_columns,
228+
columns2=table2.relevant_columns,
229+
ignored_columns1=self.ignored_columns1,
230+
ignored_columns2=self.ignored_columns1,
231+
)
232+
)
205233

206234
info_tree.info.set_diff(diff)
207235
info_tree.info.rowcounts = {1: len(rows1), 2: len(rows2)}

data_diff/table_segment.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import time
2-
from typing import List, Optional, Tuple
2+
from typing import Container, List, Optional, Tuple
33
import logging
44
from itertools import product
55

@@ -114,6 +114,7 @@ class TableSegment:
114114
key_columns: Tuple[str, ...]
115115
update_column: Optional[str] = None
116116
extra_columns: Tuple[str, ...] = ()
117+
ignored_columns: Container[str] = frozenset()
117118

118119
# Restrict the segment
119120
min_key: Optional[Vector] = None
@@ -179,7 +180,10 @@ def make_select(self):
179180

180181
def get_values(self) -> list:
181182
"Download all the relevant values of the segment from the database"
182-
select = self.make_select().select(*self._relevant_columns_repr)
183+
184+
# Fetch all the original columns, even if some were later excluded from checking.
185+
fetched_cols = [NormalizeAsString(this[c]) for c in self.relevant_columns]
186+
select = self.make_select().select(*fetched_cols)
183187
return self.database.query(select, List[Tuple])
184188

185189
def choose_checkpoints(self, count: int) -> List[List[DbKey]]:
@@ -221,18 +225,18 @@ def relevant_columns(self) -> List[str]:
221225

222226
return list(self.key_columns) + extras
223227

224-
@property
225-
def _relevant_columns_repr(self) -> List[Expr]:
226-
return [NormalizeAsString(this[c]) for c in self.relevant_columns]
227-
228228
def count(self) -> int:
229229
"""Count how many rows are in the segment, in one pass."""
230230
return self.database.query(self.make_select().select(Count()), int)
231231

232232
def count_and_checksum(self) -> Tuple[int, int]:
233233
"""Count and checksum the rows in the segment, in one pass."""
234+
235+
checked_columns = [c for c in self.relevant_columns if c not in self.ignored_columns]
236+
cols = [NormalizeAsString(this[c]) for c in checked_columns]
237+
234238
start = time.monotonic()
235-
q = self.make_select().select(Count(), Checksum(self._relevant_columns_repr))
239+
q = self.make_select().select(Count(), Checksum(cols))
236240
count, checksum = self.database.query(q, tuple)
237241
duration = time.monotonic() - start
238242
if duration > RECOMMENDED_CHECKSUM_DURATION:

0 commit comments

Comments
 (0)