116116 run_tests ,
117117)
118118from sqlmesh .core .user import User
119- from sqlmesh .utils import UniqueKeyDict , Verbosity , CorrelationId
119+ from sqlmesh .utils import UniqueKeyDict , Verbosity
120120from sqlmesh .utils .concurrency import concurrent_apply_to_values
121121from sqlmesh .utils .dag import DAG
122122from sqlmesh .utils .date import (
@@ -417,7 +417,7 @@ def __init__(
417417 self .config .get_state_connection (self .gateway ) or self .connection_config
418418 )
419419
420- self ._snapshot_evaluators : t .Dict [ t . Optional [CorrelationId ], SnapshotEvaluator ] = {}
420+ self ._snapshot_evaluator : t .Optional [SnapshotEvaluator ] = None
421421
422422 self .console = get_console ()
423423 setattr (self .console , "dialect" , self .config .dialect )
@@ -445,22 +445,18 @@ def engine_adapter(self) -> EngineAdapter:
445445 self ._engine_adapter = self .connection_config .create_engine_adapter ()
446446 return self ._engine_adapter
447447
448- def snapshot_evaluator (
449- self , correlation_id : t .Optional [CorrelationId ] = None
450- ) -> SnapshotEvaluator :
451- # Cache snapshot evaluators by correlation_id to avoid old correlation_ids being attached to future Context operations
452- if correlation_id not in self ._snapshot_evaluators :
453- self ._snapshot_evaluators [correlation_id ] = SnapshotEvaluator (
448+ @property
449+ def snapshot_evaluator (self ) -> SnapshotEvaluator :
450+ if not self ._snapshot_evaluator :
451+ self ._snapshot_evaluator = SnapshotEvaluator (
454452 {
455- gateway : adapter .with_settings (
456- log_level = logging .INFO , correlation_id = correlation_id
457- )
453+ gateway : adapter .with_settings (log_level = logging .INFO )
458454 for gateway , adapter in self .engine_adapters .items ()
459455 },
460456 ddl_concurrent_tasks = self .concurrent_tasks ,
461457 selected_gateway = self .selected_gateway ,
462458 )
463- return self ._snapshot_evaluators [ correlation_id ]
459+ return self ._snapshot_evaluator
464460
465461 def execution_context (
466462 self ,
@@ -539,10 +535,10 @@ def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
539535 if not snapshots :
540536 raise ConfigError ("No models were found" )
541537
542- return self .create_scheduler (snapshots )
538+ return self .create_scheduler (snapshots , self . snapshot_evaluator )
543539
544540 def create_scheduler (
545- self , snapshots : t .Iterable [Snapshot ], correlation_id : t . Optional [ CorrelationId ] = None
541+ self , snapshots : t .Iterable [Snapshot ], snapshot_evaluator : SnapshotEvaluator
546542 ) -> Scheduler :
547543 """Creates the built-in scheduler.
548544
@@ -554,7 +550,7 @@ def create_scheduler(
554550 """
555551 return Scheduler (
556552 snapshots ,
557- self . snapshot_evaluator ( correlation_id ) ,
553+ snapshot_evaluator ,
558554 self .state_sync ,
559555 default_catalog = self .default_catalog ,
560556 max_workers = self .concurrent_tasks ,
@@ -719,7 +715,7 @@ def run(
719715 NotificationEvent .RUN_START , environment = environment
720716 )
721717 analytics_run_id = analytics .collector .on_run_start (
722- engine_type = self .snapshot_evaluator () .adapter .dialect ,
718+ engine_type = self .snapshot_evaluator .adapter .dialect ,
723719 state_sync_type = self .state_sync .state_type (),
724720 )
725721 self ._load_materializations ()
@@ -1081,7 +1077,7 @@ def evaluate(
10811077 and not parent_snapshot .categorized
10821078 ]
10831079
1084- df = self .snapshot_evaluator () .evaluate_and_fetch (
1080+ df = self .snapshot_evaluator .evaluate_and_fetch (
10851081 snapshot ,
10861082 start = start ,
10871083 end = end ,
@@ -1593,12 +1589,7 @@ def apply(
15931589 default_catalog = self .default_catalog ,
15941590 console = self .console ,
15951591 )
1596- explainer .evaluate (
1597- plan .to_evaluatable (),
1598- snapshot_evaluator = self .snapshot_evaluator (
1599- correlation_id = CorrelationId .from_plan_id (plan .plan_id )
1600- ),
1601- )
1592+ explainer .evaluate (plan .to_evaluatable ())
16021593 return
16031594
16041595 self .notification_target_manager .notify (
@@ -2121,7 +2112,7 @@ def audit(
21212112 errors = []
21222113 skipped_count = 0
21232114 for snapshot in snapshots :
2124- for audit_result in self .snapshot_evaluator () .audit (
2115+ for audit_result in self .snapshot_evaluator .audit (
21252116 snapshot = snapshot ,
21262117 start = start ,
21272118 end = end ,
@@ -2153,7 +2144,7 @@ def audit(
21532144 self .console .log_status_update (f"Got { error .count } results, expected 0." )
21542145 if error .query :
21552146 self .console .show_sql (
2156- f"{ error .query .sql (dialect = self .snapshot_evaluator () .adapter .dialect )} "
2147+ f"{ error .query .sql (dialect = self .snapshot_evaluator .adapter .dialect )} "
21572148 )
21582149
21592150 self .console .log_status_update ("Done." )
@@ -2345,14 +2336,12 @@ def print_environment_names(self) -> None:
23452336
23462337 def close (self ) -> None :
23472338 """Releases all resources allocated by this context."""
2348- for evaluator in self ._snapshot_evaluators . values () :
2349- evaluator .close ()
2339+ if self ._snapshot_evaluator :
2340+ self . _snapshot_evaluator .close ()
23502341
23512342 if self ._state_sync :
23522343 self ._state_sync .close ()
23532344
2354- self ._snapshot_evaluators .clear ()
2355-
23562345 def _run (
23572346 self ,
23582347 environment : str ,
@@ -2403,11 +2392,7 @@ def _run(
24032392
24042393 def _apply (self , plan : Plan , circuit_breaker : t .Optional [t .Callable [[], bool ]]) -> None :
24052394 self ._scheduler .create_plan_evaluator (self ).evaluate (
2406- plan .to_evaluatable (),
2407- snapshot_evaluator = self .snapshot_evaluator (
2408- correlation_id = CorrelationId .from_plan_id (plan .plan_id )
2409- ),
2410- circuit_breaker = circuit_breaker ,
2395+ plan .to_evaluatable (), circuit_breaker = circuit_breaker
24112396 )
24122397
24132398 @python_api_analytics
@@ -2700,7 +2685,7 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
27002685 )
27012686
27022687 # Remove the expired snapshots tables
2703- self .snapshot_evaluator () .cleanup (
2688+ self .snapshot_evaluator .cleanup (
27042689 target_snapshots = cleanup_targets ,
27052690 on_complete = self .console .update_cleanup_progress ,
27062691 )
0 commit comments