Skip to content

Commit fce766b

Browse files
authored
Fix: Iterate through the whole DAG before raising an error when creating physical tables (#4305)
1 parent 029acc6 commit fce766b

3 files changed

Lines changed: 22 additions & 5 deletions

File tree

sqlmesh/core/plan/evaluator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
SnapshotId,
3434
SnapshotInfoLike,
3535
SnapshotTableInfo,
36+
SnapshotCreationFailedError,
3637
)
3738
from sqlmesh.utils import CompletionStatus
3839
from sqlmesh.core.state_sync import StateSync
@@ -291,12 +292,14 @@ def _should_create(s: Snapshot) -> bool:
291292
),
292293
on_complete=self.console.update_creation_progress,
293294
)
294-
except NodeExecutionFailedError as ex:
295+
except SnapshotCreationFailedError as ex:
295296
self.console.stop_creation_progress(success=False)
296297
progress_stopped = True
297298

298-
logger.info(str(ex), exc_info=ex)
299-
self.console.log_failed_models([ex])
299+
for error in ex.errors:
300+
logger.info(str(error), exc_info=error)
301+
302+
self.console.log_failed_models(ex.errors)
300303

301304
raise PlanError("Plan application failed.")
302305
finally:

sqlmesh/core/snapshot/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,7 @@
2626
table_name as table_name,
2727
to_table_mapping as to_table_mapping,
2828
)
29-
from sqlmesh.core.snapshot.evaluator import SnapshotEvaluator as SnapshotEvaluator
29+
from sqlmesh.core.snapshot.evaluator import (
30+
SnapshotEvaluator as SnapshotEvaluator,
31+
SnapshotCreationFailedError as SnapshotCreationFailedError,
32+
)

sqlmesh/core/snapshot/evaluator.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
from sqlmesh.utils.concurrency import (
6767
concurrent_apply_to_snapshots,
6868
concurrent_apply_to_values,
69+
NodeExecutionFailedError,
6970
)
7071
from sqlmesh.utils.date import TimeLike, now, time_like_to_str
7172
from sqlmesh.utils.errors import (
@@ -87,6 +88,13 @@
8788
logger = logging.getLogger(__name__)
8889

8990

91+
class SnapshotCreationFailedError(SQLMeshError):
92+
def __init__(self, errors: t.List[NodeExecutionFailedError[SnapshotId]]):
93+
messages = "\n\n".join(f"{error}\n {error.__cause__}" for error in errors)
94+
super().__init__(f"Physical table creation failed:\n\n{messages}")
95+
self.errors = errors
96+
97+
9098
class SnapshotEvaluator:
9199
"""Evaluates a snapshot given runtime arguments through an arbitrary EngineAdapter.
92100
@@ -397,7 +405,7 @@ def _create_snapshots(
397405
) -> None:
398406
"""Internal method to create tables in parallel."""
399407
with self.concurrent_context():
400-
concurrent_apply_to_snapshots(
408+
errors, _ = concurrent_apply_to_snapshots(
401409
snapshots_to_create,
402410
lambda s: self._create_snapshot(
403411
s,
@@ -408,7 +416,10 @@ def _create_snapshots(
408416
allow_destructive_snapshots=allow_destructive_snapshots,
409417
),
410418
self.ddl_concurrent_tasks,
419+
raise_on_error=False,
411420
)
421+
if errors:
422+
raise SnapshotCreationFailedError(errors)
412423

413424
def migrate(
414425
self,

0 commit comments

Comments
 (0)