Skip to content

Commit 8ccf84b

Browse files
Feat(dlt): Add support to override the dlt pipelines directory (#3984)
1 parent f4cb59b commit 8ccf84b

6 files changed

Lines changed: 88 additions & 12 deletions

File tree

docs/integrations/dlt.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ This will create the configuration file and directories, which are found in all
2828

2929
SQLMesh will also automatically generate models to ingest data from the pipeline incrementally. Incremental loading is ideal for large datasets where recomputing entire tables is resource-intensive. In this case utilizing the [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range). However, these model definitions can be customized to meet your specific project needs.
3030

31+
#### Specify the path to the pipelines directory
32+
33+
The default location for dlt pipelines is `~/.dlt/pipelines/<pipeline_name>`. If your pipelines are in a [different directory](https://dlthub.com/docs/general-usage/pipeline#separate-working-environments-with-pipelines_dir), use the `--dlt-path` argument to specify the path explicitly:
34+
35+
```bash
36+
$ sqlmesh init -t dlt --dlt-pipeline <pipeline-name> --dlt-path <pipelines-directory> dialect
37+
```
38+
3139
### Generating models on demand
3240

3341
To update the models in your SQLMesh project on demand, use the `dlt_refresh` command. This allows you to either specify individual tables to generate incremental models from or update all models at once.
@@ -50,6 +58,12 @@ $ sqlmesh dlt_refresh <pipeline-name> --force
5058
$ sqlmesh dlt_refresh <pipeline-name> --table <dlt-table>
5159
```
5260

61+
- **Provide the explicit path to the pipelines directory** (using `--dlt-path`):
62+
63+
```bash
64+
$ sqlmesh dlt_refresh <pipeline-name> --dlt-path <pipelines-directory>
65+
```
66+
5367
#### Configuration
5468

5569
SQLMesh will retrieve the data warehouse connection credentials from your dlt project to configure the `config.yaml` file. This configuration can be modified or customized as needed. For more details, refer to the [configuration guide](../guides/configuration.md).

sqlmesh/cli/example_project.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ def init_example_project(
252252
dialect: t.Optional[str],
253253
template: ProjectTemplate = ProjectTemplate.DEFAULT,
254254
pipeline: t.Optional[str] = None,
255+
dlt_path: t.Optional[str] = None,
255256
schema_name: str = "sqlmesh_example",
256257
) -> None:
257258
root_path = Path(path)
@@ -276,7 +277,9 @@ def init_example_project(
276277
start = None
277278
if template == ProjectTemplate.DLT:
278279
if pipeline and dialect:
279-
models, settings, start = generate_dlt_models_and_settings(pipeline, dialect)
280+
models, settings, start = generate_dlt_models_and_settings(
281+
pipeline_name=pipeline, dialect=dialect, dlt_path=dlt_path
282+
)
280283
else:
281284
raise click.ClickException(
282285
"DLT pipeline is a required argument to generate a SQLMesh project from DLT"

sqlmesh/cli/main.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ def cli(
135135
type=str,
136136
help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt",
137137
)
138+
@click.option(
139+
"--dlt-path",
140+
type=str,
141+
help="The directory where the DLT pipeline resides. Use alongside template: dlt",
142+
)
138143
@click.pass_context
139144
@error_handler
140145
@cli_analytics
@@ -143,14 +148,19 @@ def init(
143148
sql_dialect: t.Optional[str] = None,
144149
template: t.Optional[str] = None,
145150
dlt_pipeline: t.Optional[str] = None,
151+
dlt_path: t.Optional[str] = None,
146152
) -> None:
147153
"""Create a new SQLMesh repository."""
148154
try:
149155
project_template = ProjectTemplate(template.lower() if template else "default")
150156
except ValueError:
151157
raise click.ClickException(f"Invalid project template '{template}'")
152158
init_example_project(
153-
ctx.obj, dialect=sql_dialect, template=project_template, pipeline=dlt_pipeline
159+
ctx.obj,
160+
dialect=sql_dialect,
161+
template=project_template,
162+
pipeline=dlt_pipeline,
163+
dlt_path=dlt_path,
154164
)
155165

156166

@@ -955,6 +965,11 @@ def table_name(obj: Context, model_name: str, dev: bool) -> None:
955965
default=False,
956966
help="If set, existing models are overwritten with the new DLT tables.",
957967
)
968+
@click.option(
969+
"--dlt-path",
970+
type=str,
971+
help="The directory where the DLT pipeline resides.",
972+
)
958973
@click.pass_context
959974
@error_handler
960975
@cli_analytics
@@ -963,11 +978,12 @@ def dlt_refresh(
963978
pipeline: str,
964979
force: bool,
965980
table: t.List[str] = [],
981+
dlt_path: t.Optional[str] = None,
966982
) -> None:
967983
"""Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project."""
968984
from sqlmesh.integrations.dlt import generate_dlt_models
969985

970-
sqlmesh_models = generate_dlt_models(ctx.obj, pipeline, list(table or []), force)
986+
sqlmesh_models = generate_dlt_models(ctx.obj, pipeline, list(table or []), force, dlt_path)
971987
if sqlmesh_models:
972988
model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models])
973989
ctx.obj.console.log_success(f"Updated SQLMesh project with models:\n{model_names}")

sqlmesh/integrations/dlt.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,31 @@
99

1010

1111
def generate_dlt_models_and_settings(
12-
pipeline_name: str, dialect: str, tables: t.Optional[t.List[str]] = None
12+
pipeline_name: str,
13+
dialect: str,
14+
tables: t.Optional[t.List[str]] = None,
15+
dlt_path: t.Optional[str] = None,
1316
) -> t.Tuple[t.Set[t.Tuple[str, str]], str, str]:
14-
"""This function attaches to a DLT pipeline and retrieves the connection configs and
17+
"""
18+
This function attaches to a DLT pipeline and retrieves the connection configs and
1519
SQLMesh models based on the tables present in the pipeline's default schema.
20+
21+
Args:
22+
pipeline_name: The name of the DLT pipeline to attach to.
23+
dialect: The SQL dialect to use for generating SQLMesh models.
24+
tables: A list of table names to include.
25+
dlt_path: The path to the directory containing the DLT pipelines.
26+
27+
Returns:
28+
A tuple containing a set of the SQLMesh model definitions, the connection config and the start date.
1629
"""
1730

1831
import dlt
1932
from dlt.common.schema.utils import has_table_seen_data, is_complete_column
2033
from dlt.pipeline.exceptions import CannotRestorePipelineException
2134

2235
try:
23-
pipeline = dlt.attach(pipeline_name=pipeline_name)
36+
pipeline = dlt.attach(pipeline_name=pipeline_name, pipelines_dir=dlt_path or "")
2437
except CannotRestorePipelineException:
2538
raise click.ClickException(f"Could not attach to pipeline {pipeline_name}")
2639

@@ -108,14 +121,19 @@ def generate_dlt_models_and_settings(
108121

109122

110123
def generate_dlt_models(
111-
context: Context, pipeline_name: str, tables: t.List[str], force: bool
124+
context: Context,
125+
pipeline_name: str,
126+
tables: t.List[str],
127+
force: bool,
128+
dlt_path: t.Optional[str] = None,
112129
) -> t.List[str]:
113130
from sqlmesh.cli.example_project import _create_models
114131

115132
sqlmesh_models, _, _ = generate_dlt_models_and_settings(
116133
pipeline_name=pipeline_name,
117134
dialect=context.config.dialect or "",
118135
tables=tables if tables else None,
136+
dlt_path=dlt_path,
119137
)
120138

121139
if not tables and not force:

sqlmesh/magics.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ def context(self, line: str) -> None:
165165
type=str,
166166
help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt",
167167
)
168+
@argument(
169+
"--dlt-path",
170+
type=str,
171+
help="The directory where the DLT pipeline resides. Use alongside template: dlt",
172+
)
168173
@line_magic
169174
def init(self, line: str) -> None:
170175
"""Creates a SQLMesh project scaffold with a default SQL dialect."""
@@ -175,7 +180,9 @@ def init(self, line: str) -> None:
175180
)
176181
except ValueError:
177182
raise MagicError(f"Invalid project template '{args.template}'")
178-
init_example_project(args.path, args.sql_dialect, project_template, args.dlt_pipeline)
183+
init_example_project(
184+
args.path, args.sql_dialect, project_template, args.dlt_pipeline, args.dlt_path
185+
)
179186
html = str(
180187
h(
181188
"div",
@@ -741,6 +748,11 @@ def table_name(self, context: Context, line: str) -> None:
741748
action="store_true",
742749
help="If set, existing models are overwritten with the new DLT tables.",
743750
)
751+
@argument(
752+
"--dlt-path",
753+
type=str,
754+
help="The directory where the DLT pipeline resides.",
755+
)
744756
@line_magic
745757
@pass_sqlmesh_context
746758
def dlt_refresh(self, context: Context, line: str) -> None:
@@ -749,7 +761,7 @@ def dlt_refresh(self, context: Context, line: str) -> None:
749761

750762
args = parse_argstring(self.dlt_refresh, line)
751763
sqlmesh_models = generate_dlt_models(
752-
context, args.pipeline, list(args.table or []), args.force
764+
context, args.pipeline, list(args.table or []), args.force, args.dlt_path
753765
)
754766
if sqlmesh_models:
755767
model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models])

tests/cli/test_cli.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from contextlib import contextmanager
33
from os import getcwd, path, remove
44
from pathlib import Path
5+
from click import ClickException
56
import pytest
67
from click.testing import CliRunner
78
import time_machine
@@ -778,6 +779,8 @@ def test_dlt_pipeline_errors(runner, tmp_path):
778779

779780
@time_machine.travel(FREEZE_TIME)
780781
def test_plan_dlt(runner, tmp_path):
782+
from dlt.common.pipeline import get_dlt_pipelines_dir
783+
781784
root_dir = path.abspath(getcwd())
782785
pipeline_path = root_dir + "/examples/sushi_dlt/sushi_pipeline.py"
783786
dataset_path = root_dir + "/sushi.duckdb"
@@ -788,7 +791,15 @@ def test_plan_dlt(runner, tmp_path):
788791
with open(pipeline_path) as file:
789792
exec(file.read())
790793

791-
init_example_project(tmp_path, "duckdb", ProjectTemplate.DLT, "sushi")
794+
# This should fail since it won't be able to locate the pipeline in this path
795+
with pytest.raises(ClickException, match=r".*Could not attach to pipeline*"):
796+
init_example_project(
797+
tmp_path, "duckdb", ProjectTemplate.DLT, "sushi", dlt_path="./dlt2/pipelines"
798+
)
799+
800+
# By setting the pipelines path where the pipeline directory is located, it should work
801+
dlt_path = get_dlt_pipelines_dir()
802+
init_example_project(tmp_path, "duckdb", ProjectTemplate.DLT, "sushi", dlt_path=dlt_path)
792803

793804
expected_config = f"""gateways:
794805
duckdb:
@@ -925,8 +936,9 @@ def test_plan_dlt(runner, tmp_path):
925936
remove(dlt_sushi_fillings_model_path)
926937
remove(dlt_sushi_twice_nested_model_path)
927938

928-
# Update to generate a specific model: sushi_types
929-
assert generate_dlt_models(context, "sushi", ["sushi_types"], False) == [
939+
# Update to generate a specific model: sushi_types.
940+
# Also validate using the dlt_path that the pipelines are located.
941+
assert generate_dlt_models(context, "sushi", ["sushi_types"], False, dlt_path) == [
930942
"sushi_dataset_sqlmesh.incremental_sushi_types"
931943
]
932944

@@ -972,6 +984,7 @@ def test_init_project_dialects(tmp_path):
972984
remove(tmp_path / "config.yaml")
973985

974986

987+
@time_machine.travel(FREEZE_TIME)
975988
def test_environments(runner, tmp_path):
976989
create_example_project(tmp_path)
977990
ttl = time_like_to_str(to_datetime(now_ds()) + timedelta(days=7))

0 commit comments

Comments
 (0)