4040)
4141from sqlmesh .core .model .kind import TimeColumn
4242from sqlmesh .core .schema_diff import SchemaDiffer
43+ from sqlmesh .core .execution_tracker import record_execution as track_execution_record
4344from sqlmesh .utils import (
4445 CorrelationId ,
4546 columns_to_types_all_known ,
@@ -828,6 +829,7 @@ def _create_table_from_source_queries(
828829 table_description : t .Optional [str ] = None ,
829830 column_descriptions : t .Optional [t .Dict [str , str ]] = None ,
830831 table_kind : t .Optional [str ] = None ,
832+ track_row_count : bool = True ,
831833 ** kwargs : t .Any ,
832834 ) -> None :
833835 table = exp .to_table (table_name )
@@ -873,11 +875,15 @@ def _create_table_from_source_queries(
873875 replace = replace ,
874876 table_description = table_description ,
875877 table_kind = table_kind ,
878+ track_row_count = track_row_count ,
876879 ** kwargs ,
877880 )
878881 else :
879882 self ._insert_append_query (
880- table_name , query , target_columns_to_types or self .columns (table )
883+ table_name ,
884+ query ,
885+ target_columns_to_types or self .columns (table ),
886+ track_row_count = track_row_count ,
881887 )
882888
883889 # Register comments with commands if the engine supports comments and we weren't able to
@@ -901,6 +907,7 @@ def _create_table(
901907 table_description : t .Optional [str ] = None ,
902908 column_descriptions : t .Optional [t .Dict [str , str ]] = None ,
903909 table_kind : t .Optional [str ] = None ,
910+ track_row_count : bool = True ,
904911 ** kwargs : t .Any ,
905912 ) -> None :
906913 self .execute (
@@ -917,7 +924,8 @@ def _create_table(
917924 ),
918925 table_kind = table_kind ,
919926 ** kwargs ,
920- )
927+ ),
928+ track_row_count = track_row_count ,
921929 )
922930
923931 def _build_create_table_exp (
@@ -1392,6 +1400,7 @@ def insert_append(
13921400 table_name : TableName ,
13931401 query_or_df : QueryOrDF ,
13941402 target_columns_to_types : t .Optional [t .Dict [str , exp .DataType ]] = None ,
1403+ track_row_count : bool = True ,
13951404 source_columns : t .Optional [t .List [str ]] = None ,
13961405 ) -> None :
13971406 source_queries , target_columns_to_types = self ._get_source_queries_and_columns_to_types (
@@ -1400,30 +1409,39 @@ def insert_append(
14001409 target_table = table_name ,
14011410 source_columns = source_columns ,
14021411 )
1403- self ._insert_append_source_queries (table_name , source_queries , target_columns_to_types )
1412+ self ._insert_append_source_queries (
1413+ table_name , source_queries , target_columns_to_types , track_row_count
1414+ )
14041415
14051416 def _insert_append_source_queries (
14061417 self ,
14071418 table_name : TableName ,
14081419 source_queries : t .List [SourceQuery ],
14091420 target_columns_to_types : t .Optional [t .Dict [str , exp .DataType ]] = None ,
1421+ track_row_count : bool = True ,
14101422 ) -> None :
14111423 with self .transaction (condition = len (source_queries ) > 0 ):
14121424 target_columns_to_types = target_columns_to_types or self .columns (table_name )
14131425 for source_query in source_queries :
14141426 with source_query as query :
1415- self ._insert_append_query (table_name , query , target_columns_to_types )
1427+ self ._insert_append_query (
1428+ table_name , query , target_columns_to_types , track_row_count = track_row_count
1429+ )
14161430
14171431 def _insert_append_query (
14181432 self ,
14191433 table_name : TableName ,
14201434 query : Query ,
14211435 target_columns_to_types : t .Dict [str , exp .DataType ],
14221436 order_projections : bool = True ,
1437+ track_row_count : bool = True ,
14231438 ) -> None :
14241439 if order_projections :
14251440 query = self ._order_projections_and_filter (query , target_columns_to_types )
1426- self .execute (exp .insert (query , table_name , columns = list (target_columns_to_types )))
1441+ self .execute (
1442+ exp .insert (query , table_name , columns = list (target_columns_to_types )),
1443+ track_row_count = track_row_count ,
1444+ )
14271445
14281446 def insert_overwrite_by_partition (
14291447 self ,
@@ -1565,7 +1583,7 @@ def _insert_overwrite_by_condition(
15651583 )
15661584 if insert_overwrite_strategy .is_replace_where :
15671585 insert_exp .set ("where" , where or exp .true ())
1568- self .execute (insert_exp )
1586+ self .execute (insert_exp , track_row_count = True )
15691587
15701588 def update_table (
15711589 self ,
@@ -1586,7 +1604,7 @@ def _merge(
15861604 using = exp .alias_ (
15871605 exp .Subquery (this = query ), alias = MERGE_SOURCE_ALIAS , copy = False , table = True
15881606 )
1589- self .execute (exp .Merge (this = this , using = using , on = on , whens = whens ))
1607+ self .execute (exp .Merge (this = this , using = using , on = on , whens = whens ), track_row_count = True )
15901608
15911609 def scd_type_2_by_time (
15921610 self ,
@@ -2335,6 +2353,7 @@ def execute(
23352353 expressions : t .Union [str , exp .Expression , t .Sequence [exp .Expression ]],
23362354 ignore_unsupported_errors : bool = False ,
23372355 quote_identifiers : bool = True ,
2356+ track_row_count : bool = False ,
23382357 ** kwargs : t .Any ,
23392358 ) -> None :
23402359 """Execute a sql query."""
@@ -2356,7 +2375,7 @@ def execute(
23562375 expression = e if isinstance (e , exp .Expression ) else None ,
23572376 quote_identifiers = quote_identifiers ,
23582377 )
2359- self ._execute (sql , ** kwargs )
2378+ self ._execute (sql , track_row_count , ** kwargs )
23602379
23612380 def _attach_correlation_id (self , sql : str ) -> str :
23622381 if self .ATTACH_CORRELATION_ID and self .correlation_id :
@@ -2381,9 +2400,20 @@ def _log_sql(
23812400
23822401 logger .log (self ._execute_log_level , "Executing SQL: %s" , sql_to_log )
23832402
2384- def _execute (self , sql : str , ** kwargs : t .Any ) -> None :
2403+ def _execute (self , sql : str , track_row_count : bool = False , ** kwargs : t .Any ) -> None :
23852404 self .cursor .execute (sql , ** kwargs )
23862405
2406+ if track_row_count :
2407+ rowcount_raw = getattr (self .cursor , "rowcount" , None )
2408+ rowcount = None
2409+ if rowcount_raw is not None :
2410+ try :
2411+ rowcount = int (rowcount_raw )
2412+ except (TypeError , ValueError ):
2413+ pass
2414+
2415+ track_execution_record (sql , rowcount )
2416+
23872417 @contextlib .contextmanager
23882418 def temp_table (
23892419 self ,
@@ -2428,6 +2458,7 @@ def temp_table(
24282458 exists = True ,
24292459 table_description = None ,
24302460 column_descriptions = None ,
2461+ track_row_count = False ,
24312462 ** kwargs ,
24322463 )
24332464
@@ -2679,7 +2710,7 @@ def _replace_by_key(
26792710 insert_statement .set ("where" , delete_filter )
26802711 insert_statement .set ("this" , exp .to_table (target_table ))
26812712
2682- self .execute (insert_statement )
2713+ self .execute (insert_statement , track_row_count = True )
26832714 finally :
26842715 self .drop_table (temp_table )
26852716
0 commit comments