Skip to content

Commit 75b7270

Browse files
Feat(dlt): Add support for project generation in filesystem pipelines (#4028)
1 parent e4422cb commit 75b7270

3 files changed

Lines changed: 113 additions & 20 deletions

File tree

sqlmesh/cli/example_project.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ def _gen_config(
2929
template: ProjectTemplate,
3030
) -> str:
3131
if not settings:
32-
connection_settings = """ type: duckdb
33-
database: db.db"""
34-
3532
doc_link = "https://sqlmesh.readthedocs.io/en/stable/integrations/engines{engine_link}"
3633
engine_link = ""
3734

sqlmesh/integrations/dlt.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def generate_dlt_models_and_settings(
1313
dialect: str,
1414
tables: t.Optional[t.List[str]] = None,
1515
dlt_path: t.Optional[str] = None,
16-
) -> t.Tuple[t.Set[t.Tuple[str, str]], str, str]:
16+
) -> t.Tuple[t.Set[t.Tuple[str, str]], t.Optional[str], str]:
1717
"""
1818
This function attaches to a DLT pipeline and retrieves the connection configs and
1919
SQLMesh models based on the tables present in the pipeline's default schema.
@@ -40,18 +40,26 @@ def generate_dlt_models_and_settings(
4040
schema = pipeline.default_schema
4141
dataset = pipeline.dataset_name
4242

43-
client = pipeline._sql_job_client(schema)
44-
config = client.config
45-
credentials = config.credentials
46-
db_type = pipeline.destination.to_name(pipeline.destination)
43+
# Get the start date from the load_ids
4744
storage_ids = list(pipeline._get_load_storage().list_loaded_packages())
48-
configs = {
49-
key: value
50-
for key in dir(credentials)
51-
if not key.startswith("_")
52-
and not callable(value := getattr(credentials, key))
53-
and value is not None
54-
}
45+
start_date = get_start_date(storage_ids)
46+
47+
# Get the connection credentials
48+
db_type = pipeline.destination.to_name(pipeline.destination)
49+
if db_type == "filesystem":
50+
connection_config = None
51+
else:
52+
client = pipeline._sql_job_client(schema)
53+
config = client.config
54+
credentials = config.credentials
55+
configs = {
56+
key: value
57+
for key in dir(credentials)
58+
if not key.startswith("_")
59+
and not callable(value := getattr(credentials, key))
60+
and value is not None
61+
}
62+
connection_config = format_config(configs, db_type)
5563

5664
dlt_tables = {
5765
name: table
@@ -117,7 +125,7 @@ def generate_dlt_models_and_settings(
117125
)
118126
sqlmesh_models.add((incremental_model_name, incremental_model_sql))
119127

120-
return sqlmesh_models, format_config(configs, db_type), get_start_date(storage_ids)
128+
return sqlmesh_models, connection_config, start_date
121129

122130

123131
def generate_dlt_models(

tests/cli/test_cli.py

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from contextlib import contextmanager
33
from os import getcwd, path, remove
44
from pathlib import Path
5+
from shutil import rmtree
56
from click import ClickException
67
import pytest
78
from click.testing import CliRunner
@@ -801,6 +802,93 @@ def test_dlt_pipeline_errors(runner, tmp_path):
801802
assert "Error: Could not attach to pipeline" in result.output
802803

803804

805+
@time_machine.travel(FREEZE_TIME)
806+
def test_dlt_filesystem_pipeline(tmp_path):
807+
import dlt
808+
809+
root_dir = path.abspath(getcwd())
810+
storage_path = root_dir + "/temp_storage"
811+
if path.exists(storage_path):
812+
rmtree(storage_path)
813+
814+
filesystem_pipeline = dlt.pipeline(
815+
pipeline_name="filesystem_pipeline",
816+
destination=dlt.destinations.filesystem("file://" + storage_path),
817+
)
818+
info = filesystem_pipeline.run([{"item_id": 1}], table_name="equipment")
819+
assert not info.has_failed_jobs
820+
821+
init_example_project(tmp_path, "athena", ProjectTemplate.DLT, "filesystem_pipeline")
822+
823+
# Validate generated sqlmesh config and models
824+
config_path = tmp_path / "config.yaml"
825+
equipment_model_path = tmp_path / "models/incremental_equipment.sql"
826+
dlt_loads_model_path = tmp_path / "models/incremental__dlt_loads.sql"
827+
828+
assert config_path.exists()
829+
assert equipment_model_path.exists()
830+
assert dlt_loads_model_path.exists()
831+
832+
expected_incremental_model = """MODEL (
833+
name filesystem_pipeline_dataset_sqlmesh.incremental_equipment,
834+
kind INCREMENTAL_BY_TIME_RANGE (
835+
time_column _dlt_load_time,
836+
),
837+
);
838+
839+
SELECT
840+
CAST(c.item_id AS BIGINT) AS item_id,
841+
CAST(c._dlt_load_id AS VARCHAR) AS _dlt_load_id,
842+
CAST(c._dlt_id AS VARCHAR) AS _dlt_id,
843+
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) as _dlt_load_time
844+
FROM
845+
filesystem_pipeline_dataset.equipment as c
846+
WHERE
847+
TO_TIMESTAMP(CAST(c._dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
848+
"""
849+
850+
with open(equipment_model_path) as file:
851+
incremental_model = file.read()
852+
853+
assert incremental_model == expected_incremental_model
854+
855+
expected_config = (
856+
"gateways:\n"
857+
" athena:\n"
858+
" connection:\n"
859+
" # For more information on configuring the connection to your execution engine, visit:\n"
860+
" # https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#connections\n"
861+
" # https://sqlmesh.readthedocs.io/en/stable/integrations/engines/athena/#connection-options\n"
862+
" type: athena\n"
863+
" # concurrent_tasks: 4\n"
864+
" # register_comments: False\n"
865+
" # pre_ping: False\n"
866+
" # pretty_sql: False\n"
867+
" # aws_access_key_id: \n"
868+
" # aws_secret_access_key: \n"
869+
" # role_arn: \n"
870+
" # role_session_name: \n"
871+
" # region_name: \n"
872+
" # work_group: \n"
873+
" # s3_staging_dir: \n"
874+
" # schema_name: \n"
875+
" # catalog_name: \n"
876+
" # s3_warehouse_location: \n\n\n"
877+
"default_gateway: athena\n\n"
878+
"model_defaults:\n"
879+
" dialect: athena\n"
880+
f" start: {yesterday_ds()}\n"
881+
)
882+
883+
with open(config_path) as file:
884+
config = file.read()
885+
886+
assert config == expected_config
887+
888+
if path.exists(storage_path):
889+
rmtree(storage_path)
890+
891+
804892
@time_machine.travel(FREEZE_TIME)
805893
def test_plan_dlt(runner, tmp_path):
806894
from dlt.common.pipeline import get_dlt_pipelines_dir
@@ -1039,7 +1127,7 @@ def test_environments(runner, tmp_path):
10391127
],
10401128
)
10411129
assert result.exit_code == 0
1042-
assert result.output == f"Number of SQLMesh environments are: 1\ndev - {ttl}\n"
1130+
assert f"Number of SQLMesh environments are: 1\ndev - {ttl}\n" in result.output
10431131

10441132
# # create dev2 environment from dev environment
10451133
# # Input: `y` to apply and virtual update
@@ -1070,7 +1158,7 @@ def test_environments(runner, tmp_path):
10701158
],
10711159
)
10721160
assert result.exit_code == 0
1073-
assert result.output == f"Number of SQLMesh environments are: 2\ndev - {ttl}\ndev2 - {ttl}\n"
1161+
assert f"Number of SQLMesh environments are: 2\ndev - {ttl}\ndev2 - {ttl}\n" in result.output
10741162

10751163
# Example project models have start dates, so there are no date prompts
10761164
# for the `prod` environment.
@@ -1088,6 +1176,6 @@ def test_environments(runner, tmp_path):
10881176
)
10891177
assert result.exit_code == 0
10901178
assert (
1091-
result.output
1092-
== f"Number of SQLMesh environments are: 3\ndev - {ttl}\ndev2 - {ttl}\nprod - No Expiry\n"
1179+
f"Number of SQLMesh environments are: 3\ndev - {ttl}\ndev2 - {ttl}\nprod - No Expiry\n"
1180+
in result.output
10931181
)

0 commit comments

Comments
 (0)