|
| 1 | +import typing as t |
| 2 | +from pathlib import Path |
| 3 | +import pytest |
| 4 | +import subprocess |
| 5 | +from sqlmesh.cli.example_project import init_example_project |
| 6 | +from sqlmesh.utils import yaml |
| 7 | +import shutil |
| 8 | +import site |
| 9 | + |
| 10 | +pytestmark = pytest.mark.slow |
| 11 | + |
| 12 | + |
| 13 | +class InvokeCliType(t.Protocol): |
| 14 | + def __call__( |
| 15 | + self, sqlmesh_args: t.List[str], **kwargs: t.Any |
| 16 | + ) -> subprocess.CompletedProcess: ... |
| 17 | + |
| 18 | + |
| 19 | +@pytest.fixture |
| 20 | +def invoke_cli(tmp_path: Path) -> InvokeCliType: |
| 21 | + # Fetch the full path to the SQLMesh binary so that when we use `cwd` to run in the context of a test dir, the correct SQLMesh binary is executed |
| 22 | + # this will be the current project because `make install-dev` installs an editable version of SQLMesh into the current python environment |
| 23 | + sqlmesh_bin = subprocess.run( |
| 24 | + ["which", "sqlmesh"], capture_output=True, text=True |
| 25 | + ).stdout.strip() |
| 26 | + |
| 27 | + def _invoke(sqlmesh_args: t.List[str], **kwargs: t.Any) -> subprocess.CompletedProcess: |
| 28 | + return subprocess.run( |
| 29 | + args=[sqlmesh_bin] + sqlmesh_args, |
| 30 | + # set the working directory to the isolated temp dir for this test |
| 31 | + cwd=tmp_path, |
| 32 | + # return text instead of binary from the output streams |
| 33 | + text=True, |
| 34 | + # combine stdout/stderr into a single stream |
| 35 | + stdout=subprocess.PIPE, |
| 36 | + stderr=subprocess.STDOUT, |
| 37 | + **kwargs, |
| 38 | + ) |
| 39 | + |
| 40 | + return _invoke |
| 41 | + |
| 42 | + |
| 43 | +def test_load_snapshots_that_reference_nonexistent_python_libraries( |
| 44 | + invoke_cli: InvokeCliType, tmp_path: Path |
| 45 | +) -> None: |
| 46 | + init_example_project(tmp_path, dialect="duckdb") |
| 47 | + config_path = tmp_path / "config.yaml" |
| 48 | + |
| 49 | + # we need state to persist between invocations |
| 50 | + config_dict = yaml.load(config_path) |
| 51 | + config_dict["gateways"]["duckdb"]["state_connection"] = { |
| 52 | + "type": "duckdb", |
| 53 | + "database": str(tmp_path / "state.db"), |
| 54 | + } |
| 55 | + config_path.write_text(yaml.dump(config_dict)) |
| 56 | + |
| 57 | + # simulate a 3rd party library that provides a macro |
| 58 | + site_packages = site.getsitepackages()[0] |
| 59 | + sqlmesh_test_macros_package_path = Path(site_packages) / "sqlmesh_test_macros" |
| 60 | + sqlmesh_test_macros_package_path.mkdir() |
| 61 | + (sqlmesh_test_macros_package_path / "macros.py").write_text(""" |
| 62 | +from sqlmesh import macro |
| 63 | +
|
| 64 | +@macro() |
| 65 | +def do_something(evaluator): |
| 66 | + return "'value from site-packages'" |
| 67 | +""") |
| 68 | + |
| 69 | + # reference the macro from site-packages |
| 70 | + (tmp_path / "macros" / "__init__.py").write_text(""" |
| 71 | +from sqlmesh_test_macros.macros import do_something |
| 72 | +""") |
| 73 | + |
| 74 | + (tmp_path / "models" / "example.sql").write_text(""" |
| 75 | +MODEL ( |
| 76 | + name example.test_model, |
| 77 | + kind FULL |
| 78 | +); |
| 79 | +
|
| 80 | +select @do_something() as a |
| 81 | +""") |
| 82 | + |
| 83 | + result = invoke_cli(["plan", "--no-prompts", "--auto-apply", "--skip-tests"]) |
| 84 | + |
| 85 | + assert result.returncode == 0 |
| 86 | + assert "Physical layer updated" in result.stdout |
| 87 | + assert "Virtual layer updated" in result.stdout |
| 88 | + |
| 89 | + # render the query to ensure our macro is being invoked |
| 90 | + result = invoke_cli(["render", "example.test_model"]) |
| 91 | + assert result.returncode == 0 |
| 92 | + assert """SELECT 'value from site-packages' AS "a\"""" in " ".join(result.stdout.split()) |
| 93 | + |
| 94 | + # clear cache to ensure we are forced to reload everything |
| 95 | + assert invoke_cli(["clean"]).returncode == 0 |
| 96 | + |
| 97 | + # deleting this removes the 'do_something()' macro used by the version of the snapshot stored in state |
| 98 | + # when loading the old snapshot from state in the local python env, this will create an ImportError |
| 99 | + shutil.rmtree(sqlmesh_test_macros_package_path) |
| 100 | + |
| 101 | + # Move the macro inline so its no longer being loaded from a library but still exists with the same signature |
| 102 | + (tmp_path / "macros" / "__init__.py").write_text(""" |
| 103 | +from sqlmesh import macro |
| 104 | +
|
| 105 | +@macro() |
| 106 | +def do_something(evaluator): |
| 107 | + return "'some value not from site-packages'" |
| 108 | +""") |
| 109 | + |
| 110 | + # this should produce an error but not a fatal one. there will be an error rendering the optimized query of the old snapshot, which should be logged |
| 111 | + result = invoke_cli( |
| 112 | + [ |
| 113 | + "plan", |
| 114 | + "--no-prompts", |
| 115 | + "--auto-apply", |
| 116 | + "--skip-tests", |
| 117 | + ] |
| 118 | + ) |
| 119 | + assert result.returncode == 0 |
| 120 | + assert "Physical layer updated" in result.stdout |
| 121 | + assert "Virtual layer updated" in result.stdout |
| 122 | + |
| 123 | + log_file = sorted(list((tmp_path / "logs").iterdir()))[-1] |
| 124 | + log_file_contents = log_file.read_text() |
| 125 | + assert "ModuleNotFoundError: No module named 'sqlmesh_test_macros'" in log_file_contents |
| 126 | + assert ( |
| 127 | + "ERROR - Failed to cache optimized query for model 'example.test_model'" |
| 128 | + in log_file_contents |
| 129 | + ) |
| 130 | + assert ( |
| 131 | + 'ERROR - Failed to cache snapshot SnapshotId<"db"."example"."test_model"' |
| 132 | + in log_file_contents |
| 133 | + ) |
0 commit comments