11from __future__ import annotations
22
33import abc
4+ import concurrent .futures
45import glob
56import itertools
67import linecache
78import multiprocessing as mp
89import os
910import re
1011import typing as t
12+ import concurrent
1113from collections import Counter , defaultdict
1214from dataclasses import dataclass
1315from pathlib import Path
1416from pydantic import ValidationError
15- from concurrent .futures import ProcessPoolExecutor , as_completed
1617
1718from sqlglot .errors import SqlglotError
1819from sqlglot import exp
@@ -478,20 +479,15 @@ def _load_models(
478479 audits into a Dict and creates the dag
479480 """
480481 cache = SqlMeshLoader ._Cache (self , self .config_path )
481- import time
482482
483- now = time .time ()
484483 sql_models = self ._load_sql_models (macros , jinja_macros , audits , signals , cache , gateway )
485- print ("sql models" , time .time () - now )
486- now = time .time ()
487484 external_models = self ._load_external_models (audits , cache , gateway )
488- print ("external models" , time .time () - now )
489485 python_models = self ._load_python_models (macros , jinja_macros , audits , signals )
490486
491487 all_model_names = list (sql_models ) + list (external_models ) + list (python_models )
492488 duplicates = [name for name , count in Counter (all_model_names ).items () if count > 1 ]
493489 if duplicates :
494- raise ValueError (f"Duplicate model name(s) found: { ', ' .join (duplicates )} ." )
490+ raise ConfigError (f"Duplicate model name(s) found: { ', ' .join (duplicates )} ." )
495491
496492 return UniqueKeyDict ("models" , ** sql_models , ** external_models , ** python_models )
497493
@@ -506,8 +502,7 @@ def _load_sql_models(
506502 ) -> UniqueKeyDict [str , Model ]:
507503 """Loads the sql models into a Dict"""
508504 models : UniqueKeyDict [str , Model ] = UniqueKeyDict ("models" )
509-
510- paths = set ()
505+ paths : t .Set [Path ] = set ()
511506
512507 for path in self ._glob_paths (
513508 self .config_path / c .MODELS ,
@@ -522,14 +517,11 @@ def _load_sql_models(
522517
523518 for path in paths .copy ():
524519 cached_models = cache .get (path )
525-
526520 if cached_models :
527521 paths .remove (path )
528-
529522 for model in cached_models :
530- models [model .fqn ] = model
531-
532- error = False
523+ if model .enabled :
524+ models [model .fqn ] = model
533525
534526 if paths :
535527 defaults = dict (
@@ -550,31 +542,31 @@ def _load_sql_models(
550542 default_catalog_per_gateway = self .context .default_catalog_per_gateway ,
551543 )
552544
553- with ProcessPoolExecutor (
545+ errors : t .List [str ] = []
546+ with concurrent .futures .ProcessPoolExecutor (
554547 mp_context = mp .get_context ("fork" ),
555548 initializer = _init_model_defaults ,
556549 initargs = (self .config , gateway , defaults , cache ),
557550 max_workers = c .MAX_FORK_WORKERS ,
558551 ) as pool :
559- for fut in as_completed (pool .submit (load_sql_models , path ) for path in paths ):
552+ futures_to_paths = {pool .submit (load_sql_models , path ): path for path in paths }
553+ for fut , path in futures_to_paths .items ():
560554 try :
561- path , loaded = fut .result ()
562-
555+ _ , loaded = fut .result ()
563556 if loaded :
564557 for model in loaded :
565- model ._path = path
566- models [model .fqn ] = model
558+ if model .enabled :
559+ model ._path = path
560+ models [model .fqn ] = model
567561 else :
568562 for model in cache .get (path ):
569- models [model .fqn ] = model
563+ if model .enabled :
564+ models [model .fqn ] = model
570565 except Exception as ex :
571- self ._console .log_error (
572- f"Failed to load model definition at '{ path } '.\n { ex } "
573- )
574- error = True
566+ errors .append (f"Failed to load model definition at '{ path } '.\n \n { ex } " )
575567
576- if error :
577- raise ConfigError ("Failed to load models" )
568+ if errors :
569+ raise ConfigError (f "Failed to load models\n \n { ' \n ' . join ( errors ) } " )
578570
579571 return models
580572
0 commit comments