Skip to content

Commit c1c312f

Browse files
committed
Add BQ support and track bytes processed
1 parent 531a082 commit c1c312f

19 files changed

Lines changed: 193 additions & 116 deletions

File tree

sqlmesh/core/console.py

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from sqlmesh.core.environment import EnvironmentNamingInfo, EnvironmentSummary
3333
from sqlmesh.core.linter.rule import RuleViolation
3434
from sqlmesh.core.model import Model
35+
from sqlmesh.core.execution_tracker import QueryExecutionStats
3536
from sqlmesh.core.snapshot import (
3637
Snapshot,
3738
SnapshotChangeCategory,
@@ -439,7 +440,7 @@ def update_snapshot_evaluation_progress(
439440
num_audits_passed: int,
440441
num_audits_failed: int,
441442
audit_only: bool = False,
442-
rows_processed: t.Optional[int] = None,
443+
execution_stats: t.Optional[QueryExecutionStats] = None,
443444
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
444445
) -> None:
445446
"""Updates the snapshot evaluation progress."""
@@ -588,7 +589,7 @@ def update_snapshot_evaluation_progress(
588589
num_audits_passed: int,
589590
num_audits_failed: int,
590591
audit_only: bool = False,
591-
rows_processed: t.Optional[int] = None,
592+
execution_stats: t.Optional[QueryExecutionStats] = None,
592593
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
593594
) -> None:
594595
pass
@@ -1035,7 +1036,7 @@ def start_evaluation_progress(
10351036
# determine column widths
10361037
self.evaluation_column_widths["annotation"] = (
10371038
_calculate_annotation_str_len(
1038-
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1039+
batched_intervals, self.AUDIT_PADDING, len(" (123.4m rows, 123.4 KiB)")
10391040
)
10401041
+ 3 # brackets and opening escape backslash
10411042
)
@@ -1081,7 +1082,7 @@ def update_snapshot_evaluation_progress(
10811082
num_audits_passed: int,
10821083
num_audits_failed: int,
10831084
audit_only: bool = False,
1084-
rows_processed: t.Optional[int] = None,
1085+
execution_stats: t.Optional[QueryExecutionStats] = None,
10851086
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10861087
) -> None:
10871088
"""Update the snapshot evaluation progress."""
@@ -1102,7 +1103,7 @@ def update_snapshot_evaluation_progress(
11021103
).ljust(self.evaluation_column_widths["name"])
11031104

11041105
annotation = _create_evaluation_model_annotation(
1105-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
1106+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
11061107
)
11071108
audits_str = ""
11081109
if num_audits_passed:
@@ -3673,7 +3674,7 @@ def update_snapshot_evaluation_progress(
36733674
num_audits_passed: int,
36743675
num_audits_failed: int,
36753676
audit_only: bool = False,
3676-
rows_processed: t.Optional[int] = None,
3677+
execution_stats: t.Optional[QueryExecutionStats] = None,
36773678
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36783679
) -> None:
36793680
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
@@ -3844,7 +3845,7 @@ def update_snapshot_evaluation_progress(
38443845
num_audits_passed: int,
38453846
num_audits_failed: int,
38463847
audit_only: bool = False,
3847-
rows_processed: t.Optional[int] = None,
3848+
execution_stats: t.Optional[QueryExecutionStats] = None,
38483849
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38493850
) -> None:
38503851
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
@@ -4179,11 +4180,27 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41794180

41804181

41814182
def _create_evaluation_model_annotation(
4182-
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4183+
snapshot: Snapshot,
4184+
interval_info: t.Optional[str],
4185+
execution_stats: t.Optional[QueryExecutionStats],
41834186
) -> str:
41844187
annotation = None
4185-
num_rows_processed = str(rows_processed) if rows_processed else ""
4186-
rows_processed_str = f" ({num_rows_processed} rows)" if num_rows_processed else ""
4188+
execution_stats_str = ""
4189+
if execution_stats:
4190+
rows_processed = execution_stats.total_rows_processed
4191+
execution_stats_str += (
4192+
f"{_abbreviate_integer_count(rows_processed)} row{'s' if rows_processed > 1 else ''}"
4193+
if rows_processed
4194+
else ""
4195+
)
4196+
4197+
bytes_processed = execution_stats.total_bytes_processed
4198+
execution_stats_str += (
4199+
f"{', ' if execution_stats_str else ''}{_format_bytes(bytes_processed)}"
4200+
if bytes_processed
4201+
else ""
4202+
)
4203+
execution_stats_str = f" ({execution_stats_str})" if execution_stats_str else ""
41874204

41884205
if snapshot.is_audit:
41894206
annotation = "run standalone audit"
@@ -4193,30 +4210,32 @@ def _create_evaluation_model_annotation(
41934210
if snapshot.model.kind.is_view:
41944211
annotation = "recreate view"
41954212
if snapshot.model.kind.is_seed:
4196-
annotation = f"insert seed file{rows_processed_str}"
4213+
annotation = f"insert seed file{execution_stats_str}"
41974214
if snapshot.model.kind.is_full:
4198-
annotation = f"full refresh{rows_processed_str}"
4215+
annotation = f"full refresh{execution_stats_str}"
41994216
if snapshot.model.kind.is_incremental_by_unique_key:
4200-
annotation = f"insert/update rows{rows_processed_str}"
4217+
annotation = f"insert/update rows{execution_stats_str}"
42014218
if snapshot.model.kind.is_incremental_by_partition:
4202-
annotation = f"insert partitions{rows_processed_str}"
4219+
annotation = f"insert partitions{execution_stats_str}"
42034220

42044221
if annotation:
42054222
return annotation
42064223

4207-
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4224+
return f"{interval_info}{execution_stats_str}" if interval_info else ""
42084225

42094226

42104227
def _calculate_interval_str_len(
4211-
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4228+
snapshot: Snapshot,
4229+
intervals: t.List[Interval],
4230+
execution_stats: t.Optional[QueryExecutionStats] = None,
42124231
) -> int:
42134232
interval_str_len = 0
42144233
for interval in intervals:
42154234
interval_str_len = max(
42164235
interval_str_len,
42174236
len(
42184237
_create_evaluation_model_annotation(
4219-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
4238+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
42204239
)
42214240
),
42224241
)
@@ -4271,14 +4290,50 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42714290
def _calculate_annotation_str_len(
42724291
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
42734292
audit_padding: int = 0,
4274-
rows_processed_len: int = 0,
4293+
execution_stats_len: int = 0,
42754294
) -> int:
42764295
annotation_str_len = 0
42774296
for snapshot, intervals in batched_intervals.items():
42784297
annotation_str_len = max(
42794298
annotation_str_len,
42804299
_calculate_interval_str_len(snapshot, intervals)
42814300
+ _calculate_audit_str_len(snapshot, audit_padding)
4282-
+ rows_processed_len,
4301+
+ execution_stats_len,
42834302
)
42844303
return annotation_str_len
4304+
4305+
4306+
# Convert number of bytes to a human-readable string
4307+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L165
4308+
def _format_bytes(num_bytes: t.Optional[int]) -> str:
4309+
if num_bytes and num_bytes > 0:
4310+
if num_bytes < 1024:
4311+
return f"{num_bytes} Bytes"
4312+
4313+
num_bytes_float = float(num_bytes) / 1024.0
4314+
for unit in ["KiB", "MiB", "GiB", "TiB", "PiB"]:
4315+
if num_bytes_float < 1024.0:
4316+
return f"{num_bytes_float:3.1f} {unit}"
4317+
num_bytes_float /= 1024.0
4318+
4319+
num_bytes_float *= 1024.0 # undo last division in loop
4320+
return f"{num_bytes_float:3.1f} {unit}"
4321+
return ""
4322+
4323+
4324+
# Abbreviate integer count. Example: 1,000,000,000 -> 1b
4325+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L178
4326+
def _abbreviate_integer_count(count: t.Optional[int]) -> str:
4327+
if count and count > 0:
4328+
if count < 1000:
4329+
return str(count)
4330+
4331+
count_float = float(count) / 1000.0
4332+
for unit in ["k", "m", "b", "t"]:
4333+
if count_float < 1000.0:
4334+
return f"{count_float:3.1f}{unit}".strip()
4335+
count_float /= 1000.0
4336+
4337+
count_float *= 1000.0 # undo last division in loop
4338+
return f"{count_float:3.1f}{unit}".strip()
4339+
return ""

sqlmesh/core/engine_adapter/base.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ def _create_table_from_source_queries(
845845
table_description: t.Optional[str] = None,
846846
column_descriptions: t.Optional[t.Dict[str, str]] = None,
847847
table_kind: t.Optional[str] = None,
848-
track_row_count: bool = True,
848+
track_execution_stats: bool = True,
849849
**kwargs: t.Any,
850850
) -> None:
851851
table = exp.to_table(table_name)
@@ -891,15 +891,15 @@ def _create_table_from_source_queries(
891891
replace=replace,
892892
table_description=table_description,
893893
table_kind=table_kind,
894-
track_row_count=track_row_count,
894+
track_execution_stats=track_execution_stats,
895895
**kwargs,
896896
)
897897
else:
898898
self._insert_append_query(
899899
table_name,
900900
query,
901901
target_columns_to_types or self.columns(table),
902-
track_row_count=track_row_count,
902+
track_execution_stats=track_execution_stats,
903903
)
904904

905905
# Register comments with commands if the engine supports comments and we weren't able to
@@ -923,7 +923,7 @@ def _create_table(
923923
table_description: t.Optional[str] = None,
924924
column_descriptions: t.Optional[t.Dict[str, str]] = None,
925925
table_kind: t.Optional[str] = None,
926-
track_row_count: bool = True,
926+
track_execution_stats: bool = True,
927927
**kwargs: t.Any,
928928
) -> None:
929929
self.execute(
@@ -941,7 +941,7 @@ def _create_table(
941941
table_kind=table_kind,
942942
**kwargs,
943943
),
944-
track_row_count=track_row_count,
944+
track_execution_stats=track_execution_stats,
945945
)
946946

947947
def _build_create_table_exp(
@@ -1429,7 +1429,7 @@ def insert_append(
14291429
table_name: TableName,
14301430
query_or_df: QueryOrDF,
14311431
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1432-
track_row_count: bool = True,
1432+
track_execution_stats: bool = True,
14331433
source_columns: t.Optional[t.List[str]] = None,
14341434
) -> None:
14351435
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
@@ -1439,22 +1439,25 @@ def insert_append(
14391439
source_columns=source_columns,
14401440
)
14411441
self._insert_append_source_queries(
1442-
table_name, source_queries, target_columns_to_types, track_row_count
1442+
table_name, source_queries, target_columns_to_types, track_execution_stats
14431443
)
14441444

14451445
def _insert_append_source_queries(
14461446
self,
14471447
table_name: TableName,
14481448
source_queries: t.List[SourceQuery],
14491449
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1450-
track_row_count: bool = True,
1450+
track_execution_stats: bool = True,
14511451
) -> None:
14521452
with self.transaction(condition=len(source_queries) > 0):
14531453
target_columns_to_types = target_columns_to_types or self.columns(table_name)
14541454
for source_query in source_queries:
14551455
with source_query as query:
14561456
self._insert_append_query(
1457-
table_name, query, target_columns_to_types, track_row_count=track_row_count
1457+
table_name,
1458+
query,
1459+
target_columns_to_types,
1460+
track_execution_stats=track_execution_stats,
14581461
)
14591462

14601463
def _insert_append_query(
@@ -1463,13 +1466,13 @@ def _insert_append_query(
14631466
query: Query,
14641467
target_columns_to_types: t.Dict[str, exp.DataType],
14651468
order_projections: bool = True,
1466-
track_row_count: bool = True,
1469+
track_execution_stats: bool = True,
14671470
) -> None:
14681471
if order_projections:
14691472
query = self._order_projections_and_filter(query, target_columns_to_types)
14701473
self.execute(
14711474
exp.insert(query, table_name, columns=list(target_columns_to_types)),
1472-
track_row_count=track_row_count,
1475+
track_execution_stats=track_execution_stats,
14731476
)
14741477

14751478
def insert_overwrite_by_partition(
@@ -1612,7 +1615,7 @@ def _insert_overwrite_by_condition(
16121615
)
16131616
if insert_overwrite_strategy.is_replace_where:
16141617
insert_exp.set("where", where or exp.true())
1615-
self.execute(insert_exp, track_row_count=True)
1618+
self.execute(insert_exp, track_execution_stats=True)
16161619

16171620
def update_table(
16181621
self,
@@ -1633,7 +1636,9 @@ def _merge(
16331636
using = exp.alias_(
16341637
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
16351638
)
1636-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
1639+
self.execute(
1640+
exp.Merge(this=this, using=using, on=on, whens=whens), track_execution_stats=True
1641+
)
16371642

16381643
def scd_type_2_by_time(
16391644
self,
@@ -2382,7 +2387,7 @@ def execute(
23822387
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
23832388
ignore_unsupported_errors: bool = False,
23842389
quote_identifiers: bool = True,
2385-
track_row_count: bool = False,
2390+
track_execution_stats: bool = False,
23862391
**kwargs: t.Any,
23872392
) -> None:
23882393
"""Execute a sql query."""
@@ -2404,7 +2409,7 @@ def execute(
24042409
expression=e if isinstance(e, exp.Expression) else None,
24052410
quote_identifiers=quote_identifiers,
24062411
)
2407-
self._execute(sql, track_row_count, **kwargs)
2412+
self._execute(sql, track_execution_stats, **kwargs)
24082413

24092414
def _attach_correlation_id(self, sql: str) -> str:
24102415
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2429,12 +2434,12 @@ def _log_sql(
24292434

24302435
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
24312436

2432-
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
2437+
def _execute(self, sql: str, track_execution_stats: bool = False, **kwargs: t.Any) -> None:
24332438
self.cursor.execute(sql, **kwargs)
24342439

24352440
if (
24362441
self.SUPPORTS_QUERY_EXECUTION_TRACKING
2437-
and track_row_count
2442+
and track_execution_stats
24382443
and QueryExecutionTracker.is_tracking()
24392444
):
24402445
rowcount_raw = getattr(self.cursor, "rowcount", None)
@@ -2445,7 +2450,7 @@ def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) ->
24452450
except (TypeError, ValueError):
24462451
pass
24472452

2448-
QueryExecutionTracker.record_execution(sql, rowcount)
2453+
QueryExecutionTracker.record_execution(sql, rowcount, None)
24492454

24502455
@contextlib.contextmanager
24512456
def temp_table(
@@ -2491,7 +2496,7 @@ def temp_table(
24912496
exists=True,
24922497
table_description=None,
24932498
column_descriptions=None,
2494-
track_row_count=False,
2499+
track_execution_stats=False,
24952500
**kwargs,
24962501
)
24972502

@@ -2743,7 +2748,7 @@ def _replace_by_key(
27432748
insert_statement.set("where", delete_filter)
27442749
insert_statement.set("this", exp.to_table(target_table))
27452750

2746-
self.execute(insert_statement, track_row_count=True)
2751+
self.execute(insert_statement, track_execution_stats=True)
27472752
finally:
27482753
self.drop_table(temp_table)
27492754

0 commit comments

Comments
 (0)