Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit f213b26

Browse files
author
Sergey Vasilyev
committed
Squash abstract database into simply base database
1 parent 6db9f66 commit f213b26

3 files changed

Lines changed: 47 additions & 75 deletions

File tree

data_diff/abcs/database_types.py

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -268,76 +268,6 @@ def to_comparable(self, value: str, coltype: ColType) -> str:
268268
"""Ensure that the expression is comparable in ``IS DISTINCT FROM``."""
269269

270270

271-
from typing import TypeVar, Generic
272-
273-
T_Dialect = TypeVar("T_Dialect", bound=AbstractDialect)
274-
275-
276-
class AbstractDatabase(Generic[T_Dialect]):
277-
@property
278-
@abstractmethod
279-
def dialect(self) -> T_Dialect:
280-
"The dialect of the database. Used internally by Database, and also available publicly."
281-
282-
@classmethod
283-
@abstractmethod
284-
def load_mixins(cls, *abstract_mixins) -> type:
285-
"Extend the dialect with a list of mixins that implement the given abstract mixins."
286-
287-
@property
288-
@abstractmethod
289-
def CONNECT_URI_HELP(self) -> str:
290-
"Example URI to show the user in help and error messages"
291-
292-
@property
293-
@abstractmethod
294-
def CONNECT_URI_PARAMS(self) -> List[str]:
295-
"List of parameters given in the path of the URI"
296-
297-
@abstractmethod
298-
def _query(self, sql_code: str) -> list:
299-
"Send query to database and return result"
300-
301-
@abstractmethod
302-
def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
303-
"""Query the table for its schema for table in 'path', and return {column: tuple}
304-
where the tuple is (table_name, col_name, type_repr, datetime_precision?, numeric_precision?, numeric_scale?)
305-
306-
Note: This method exists instead of select_table_schema(), just because not all databases support
307-
accessing the schema using a SQL query.
308-
"""
309-
310-
@abstractmethod
311-
def select_table_unique_columns(self, path: DbPath) -> str:
312-
"Provide SQL for selecting the names of unique columns in the table"
313-
314-
@abstractmethod
315-
def query_table_unique_columns(self, path: DbPath) -> List[str]:
316-
"""Query the table for its unique columns for table in 'path', and return {column}"""
317-
318-
@abstractmethod
319-
def _process_table_schema(
320-
self, path: DbPath, raw_schema: Dict[str, tuple], filter_columns: Sequence[str], where: str = None
321-
):
322-
"""Process the result of query_table_schema().
323-
324-
Done in a separate step, to minimize the amount of processed columns.
325-
Needed because processing each column may:
326-
* throw errors and warnings
327-
* query the database to sample values
328-
329-
"""
330-
331-
@abstractmethod
332-
def close(self):
333-
"Close connection(s) to the database instance. Querying will stop functioning."
334-
335-
@property
336-
@abstractmethod
337-
def is_autocommit(self) -> bool:
338-
"Return whether the database autocommits changes. When false, COMMIT statements are skipped."
339-
340-
341271
class AbstractTable(ABC):
342272
@abstractmethod
343273
def select(self, *exprs, distinct=False, **named_exprs) -> "AbstractTable":

data_diff/databases/base.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import abc
12
import functools
23
from dataclasses import field
34
from datetime import datetime
@@ -31,7 +32,6 @@
3132
Root, TableAlias, TableOp, TablePath, \
3233
TimeTravel, TruncateTable, UnaryOp, WhenThen, _ResolveColumn
3334
from data_diff.abcs.database_types import (
34-
AbstractDatabase,
3535
Array,
3636
Struct,
3737
AbstractDialect,
@@ -92,7 +92,7 @@ class Compiler(AbstractCompiler):
9292
# Database is needed to normalize tables. Dialect is needed for recursive compilations.
9393
# In theory, it is many-to-many relations: e.g. a generic ODBC driver with multiple dialects.
9494
# In practice, we currently bind the dialects to the specific database classes.
95-
database: AbstractDatabase
95+
database: "Database"
9696

9797
in_select: bool = False # Compilation runtime flag
9898
in_join: bool = False # Compilation runtime flag
@@ -789,7 +789,7 @@ def __getitem__(self, i):
789789
return self.rows[i]
790790

791791

792-
class Database(AbstractDatabase[T]):
792+
class Database(abc.ABC):
793793
"""Base abstract class for databases.
794794
795795
Used for providing connection code and implementation specific SQL utilities.
@@ -898,6 +898,12 @@ def select_table_schema(self, path: DbPath) -> str:
898898
)
899899

900900
def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
901+
"""Query the table for its schema for table in 'path', and return {column: tuple}
902+
where the tuple is (table_name, col_name, type_repr, datetime_precision?, numeric_precision?, numeric_scale?)
903+
904+
Note: This method exists instead of select_table_schema(), just because not all databases support
905+
accessing the schema using a SQL query.
906+
"""
901907
rows = self.query(self.select_table_schema(path), list)
902908
if not rows:
903909
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")
@@ -907,6 +913,7 @@ def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
907913
return d
908914

909915
def select_table_unique_columns(self, path: DbPath) -> str:
916+
"Provide SQL for selecting the names of unique columns in the table"
910917
schema, name = self._normalize_table_path(path)
911918

912919
return (
@@ -916,6 +923,7 @@ def select_table_unique_columns(self, path: DbPath) -> str:
916923
)
917924

918925
def query_table_unique_columns(self, path: DbPath) -> List[str]:
926+
"""Query the table for its unique columns for table in 'path', and return {column}"""
919927
if not self.SUPPORTS_UNIQUE_CONSTAINT:
920928
raise NotImplementedError("This database doesn't support 'unique' constraints")
921929
res = self.query(self.select_table_unique_columns(path), List[str])
@@ -924,6 +932,14 @@ def query_table_unique_columns(self, path: DbPath) -> List[str]:
924932
def _process_table_schema(
925933
self, path: DbPath, raw_schema: Dict[str, tuple], filter_columns: Sequence[str] = None, where: str = None
926934
):
935+
"""Process the result of query_table_schema().
936+
937+
Done in a separate step, to minimize the amount of processed columns.
938+
Needed because processing each column may:
939+
* throw errors and warnings
940+
* query the database to sample values
941+
942+
"""
927943
if filter_columns is None:
928944
filtered_schema = raw_schema
929945
else:
@@ -1017,12 +1033,37 @@ def _query_conn(self, conn, sql_code: Union[str, ThreadLocalInterpreter]) -> Que
10171033
return apply_query(callback, sql_code)
10181034

10191035
def close(self):
1036+
"Close connection(s) to the database instance. Querying will stop functioning."
10201037
self.is_closed = True
10211038
return super().close()
10221039

10231040
def list_tables(self, tables_like, schema=None):
10241041
return self.query(self.dialect.list_tables(schema or self.default_schema, tables_like))
10251042

1043+
@property
1044+
@abstractmethod
1045+
def dialect(self) -> BaseDialect:
1046+
"The dialect of the database. Used internally by Database, and also available publicly."
1047+
1048+
@property
1049+
@abstractmethod
1050+
def CONNECT_URI_HELP(self) -> str:
1051+
"Example URI to show the user in help and error messages"
1052+
1053+
@property
1054+
@abstractmethod
1055+
def CONNECT_URI_PARAMS(self) -> List[str]:
1056+
"List of parameters given in the path of the URI"
1057+
1058+
@abstractmethod
1059+
def _query(self, sql_code: str) -> list:
1060+
"Send query to database and return result"
1061+
1062+
@property
1063+
@abstractmethod
1064+
def is_autocommit(self) -> bool:
1065+
"Return whether the database autocommits changes. When false, COMMIT statements are skipped."
1066+
10261067

10271068
class ThreadedDatabase(Database):
10281069
"""Access the database through singleton threads.

data_diff/schema.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import logging
22

3+
from data_diff import Database
34
from data_diff.utils import CaseAwareMapping, CaseInsensitiveDict, CaseSensitiveDict
4-
from data_diff.abcs.database_types import AbstractDatabase, DbPath
5+
from data_diff.abcs.database_types import DbPath
56

67
logger = logging.getLogger("schema")
78

89
Schema = CaseAwareMapping
910

1011

11-
def create_schema(db: AbstractDatabase, table_path: DbPath, schema: dict, case_sensitive: bool) -> CaseAwareMapping:
12+
def create_schema(db: Database, table_path: DbPath, schema: dict, case_sensitive: bool) -> CaseAwareMapping:
1213
logger.debug(f"[{db.name}] Schema = {schema}")
1314

1415
if case_sensitive:

0 commit comments

Comments
 (0)