Skip to content

Commit 85ed2e0

Browse files
refactors and improvements
1 parent 08ba90c commit 85ed2e0

6 files changed

Lines changed: 29 additions & 12 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ engine-up: engine-clickhouse-up engine-mssql-up engine-mysql-up engine-postgres-
6464
engine-down: engine-clickhouse-down engine-mssql-down engine-mysql-down engine-postgres-down engine-spark-down engine-trino-down
6565

6666
fast-test:
67-
pytest -n auto -m "fast and not cicdonly" && pytest -m "isolated" && pytest -m "isolated2"
67+
pytest -n auto -m "fast and not cicdonly" && pytest -m "isolated" && pytest -m "registry_isolation"
6868

6969
slow-test:
7070
pytest -n auto -m "(fast or slow) and not cicdonly" && pytest -m "isolated"

pytest.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ markers =
88
remote: test that involves interacting with a remote DB
99
cicdonly: test that only runs on CI/CD
1010
isolated: tests that need to run sequentially usually because they use fork
11-
isolated2: tests that need to run isolated because they interfere
11+
registry_isolation: tests that need to run isolated because they manipulate global registries or state
1212

1313
# Test Domain Markers
1414
# default: core functionality

sqlmesh/core/loader.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from sqlmesh.core import constants as c
2121
from sqlmesh.core.audit import Audit, ModelAudit, StandaloneAudit, load_multiple_audits
22+
from sqlmesh.core.console import Console
2223
from sqlmesh.core.dialect import parse
2324
from sqlmesh.core.environment import EnvironmentStatements
2425
from sqlmesh.core.linter.rule import Rule
@@ -85,25 +86,32 @@ def get(self, path: Path) -> t.List[Model]:
8586
def _init_model_defaults(
8687
config: Config,
8788
selected_gateway: t.Optional[str],
88-
defaults: t.Optional[t.Dict[str, t.Any]] = None,
89+
model_loading_defaults: t.Optional[t.Dict[str, t.Any]] = None,
8990
cache: t.Optional[CacheBase] = None,
91+
console: t.Optional[Console] = None,
9092
) -> None:
9193
global _defaults, _cache, _config, _selected_gateway
92-
_defaults = defaults
94+
_defaults = model_loading_defaults
9395
_cache = cache
9496
_config = config
9597
_selected_gateway = selected_gateway
9698

99+
# Set the console passed from the parent process
100+
if console is not None:
101+
from sqlmesh.core.console import set_console
97102

98-
def load_sql_models(path: Path) -> t.Tuple[Path, list[Model]]:
103+
set_console(console)
104+
105+
106+
def load_sql_models(path: Path) -> t.List[Model]:
99107
assert _defaults
100108
assert _cache
101109

102110
with open(path, "r", encoding="utf-8") as file:
103111
expressions = parse(file.read(), default_dialect=_defaults["dialect"])
104112
models = load_sql_based_models(expressions, path=Path(path).absolute(), **_defaults)
105113

106-
return (path, [] if _cache.put(models, path) else models)
114+
return [] if _cache.put(models, path) else models
107115

108116

109117
def get_variables(gateway_name: t.Optional[str] = None) -> t.Dict[str, t.Any]:
@@ -528,7 +536,7 @@ def _load_sql_models(
528536
models[model.fqn] = model
529537

530538
if paths:
531-
defaults = dict(
539+
model_loading_defaults = dict(
532540
get_variables=get_variables,
533541
defaults=self.config.model_defaults.dict(),
534542
macros=macros,
@@ -549,14 +557,14 @@ def _load_sql_models(
549557
errors: t.List[str] = []
550558
with create_process_pool_executor(
551559
initializer=_init_model_defaults,
552-
initargs=(self.config, gateway, defaults, cache),
560+
initargs=(self.config, gateway, model_loading_defaults, cache, self._console),
553561
max_workers=c.MAX_FORK_WORKERS,
554562
) as pool:
555563
futures_to_paths = {pool.submit(load_sql_models, path): path for path in paths}
556564
for future in concurrent.futures.as_completed(futures_to_paths):
557565
path = futures_to_paths[future]
558566
try:
559-
_, loaded = future.result()
567+
loaded = future.result()
560568
for model in loaded or cache.get(path):
561569
if model.fqn in models:
562570
errors.append(
@@ -572,7 +580,7 @@ def _load_sql_models(
572580

573581
if errors:
574582
error_string = "\n".join(errors)
575-
raise ConfigError(f"Failed to load models\n\n{error_string}")
583+
raise ConfigError(error_string)
576584

577585
return models
578586

sqlmesh/utils/process.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,17 @@ def __enter__(self):
2424
return self
2525

2626
def __exit__(self, *args):
27+
self.shutdown(wait=True)
2728
return True
2829

30+
def shutdown(self, wait=True, cancel_futures=False):
31+
"""No-op method to match ProcessPoolExecutor API.
32+
33+
Since this executor runs synchronously, there are no background processes
34+
or resources to shut down and all futures will have completed already.
35+
"""
36+
pass
37+
2938
def submit(self, fn, *args, **kwargs):
3039
"""Execute the function synchronously and return a Future with the result."""
3140
future = Future()

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def validate(
183183

184184

185185
def pytest_collection_modifyitems(items, *args, **kwargs):
186-
test_type_markers = {"fast", "slow", "docker", "remote", "isolated", "isolated2"}
186+
test_type_markers = {"fast", "slow", "docker", "remote", "isolated", "registry_isolation"}
187187
for item in items:
188188
for marker in item.iter_markers():
189189
if marker.name in test_type_markers:

tests/core/test_loader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def duplicate_model_path(fpath):
115115
Context(paths=tmp_path, config=config)
116116

117117

118-
@pytest.mark.isolated2
118+
@pytest.mark.registry_isolation
119119
def test_duplicate_python_model_names_raise_error(tmp_path: Path) -> None:
120120
"""Test python models with duplicate model names raises ConfigError if the functions are not identical."""
121121
init_example_project(tmp_path, dialect="duckdb")

0 commit comments

Comments
 (0)