-
Notifications
You must be signed in to change notification settings - Fork 380
chore: improve cycle detection error #5338
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,6 +99,56 @@ def upstream(self, node: T) -> t.Set[T]: | |
|
|
||
| return self._upstream[node] | ||
|
|
||
| def _find_cycle_path(self, nodes_in_cycle: t.Dict[T, t.Set[T]]) -> t.Optional[t.List[T]]: | ||
| """Find the exact cycle path using DFS when a cycle is detected. | ||
|
|
||
| Args: | ||
| nodes_in_cycle: Dictionary of nodes that are part of the cycle and their dependencies | ||
|
|
||
| Returns: | ||
| List of nodes forming the cycle path, or None if no cycle found | ||
| """ | ||
| if not nodes_in_cycle: | ||
| return None | ||
|
|
||
| # Use DFS to find a cycle path | ||
| visited: t.Set[T] = set() | ||
| rec_stack: t.Set[T] = set() | ||
| path: t.List[T] = [] | ||
|
|
||
| def dfs(node: T) -> t.Optional[t.List[T]]: | ||
| if node in rec_stack: | ||
| # Found a cycle - extract the cycle path | ||
| cycle_start = path.index(node) | ||
| return path[cycle_start:] + [node] | ||
|
|
||
| if node in visited: | ||
| return None | ||
|
|
||
| visited.add(node) | ||
| rec_stack.add(node) | ||
| path.append(node) | ||
|
|
||
| # Only follow edges to nodes that are still in the unprocessed set | ||
| for neighbor in nodes_in_cycle.get(node, set()): | ||
| if neighbor in nodes_in_cycle: | ||
| cycle = dfs(neighbor) | ||
| if cycle: | ||
| return cycle | ||
|
|
||
| rec_stack.remove(node) | ||
| path.pop() | ||
| return None | ||
|
|
||
| # Try starting DFS from each unvisited node | ||
| for start_node in nodes_in_cycle: | ||
| if start_node not in visited: | ||
| cycle = dfs(start_node) | ||
| if cycle: | ||
| return cycle[:-1] # Remove the duplicate node at the end | ||
|
|
||
| return None | ||
|
|
||
| @property | ||
| def roots(self) -> t.Set[T]: | ||
| """Returns all nodes in the graph without any upstream dependencies.""" | ||
|
|
@@ -125,23 +175,28 @@ def sorted(self) -> t.List[T]: | |
| next_nodes = {node for node, deps in unprocessed_nodes.items() if not deps} | ||
|
|
||
| if not next_nodes: | ||
| # Sort cycle candidates to make the order deterministic | ||
| cycle_candidates_msg = ( | ||
| "\nPossible candidates to check for circular references: " | ||
| + ", ".join(str(node) for node in sorted(cycle_candidates)) | ||
| ) | ||
| # A cycle was detected - find the exact cycle path | ||
| cycle_path = self._find_cycle_path(unprocessed_nodes) | ||
|
|
||
| if last_processed_nodes: | ||
| last_processed_msg = "\nLast nodes added to the DAG: " + ", ".join( | ||
| str(node) for node in last_processed_nodes | ||
| ) | ||
| last_processed_msg = "" | ||
| if cycle_path: | ||
| cycle_msg = f"\nCycle: {' -> '.join(str(node) for node in cycle_path)} -> {cycle_path[0]}" | ||
| else: | ||
| last_processed_msg = "" | ||
| # Fallback message in case a cycle can't be found | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there scenarios where a cycle won't be found? I'm wondering if we can remove the else.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No known scenarios at this time but it seems like a safe fallback to have in place for now.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be better to not have a fallback to get user feedback on what scenario hits it more quickly? |
||
| cycle_candidates_msg = ( | ||
| "\nPossible candidates to check for circular references: " | ||
| + ", ".join(str(node) for node in sorted(cycle_candidates)) | ||
| ) | ||
| cycle_msg = cycle_candidates_msg | ||
| if last_processed_nodes: | ||
| last_processed_msg = "\nLast nodes added to the DAG: " + ", ".join( | ||
| str(node) for node in last_processed_nodes | ||
| ) | ||
|
|
||
| raise SQLMeshError( | ||
| "Detected a cycle in the DAG. " | ||
| "Please make sure there are no circular references between nodes." | ||
| f"{last_processed_msg}{cycle_candidates_msg}" | ||
| f"{last_processed_msg}{cycle_msg}" | ||
| ) | ||
|
|
||
| for node in next_nodes: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.