From bf8f2784aa831966042f1a4cd0ed6def02f0cfa5 Mon Sep 17 00:00:00 2001 From: Themis Valtinos <73662635+themisvaltinos@users.noreply.github.com> Date: Fri, 20 Jun 2025 10:24:05 +0300 Subject: [PATCH] Fix(lsp): Make server stable with async context loading --- sqlmesh/lsp/main.py | 187 ++++++++++++++++++++++++++------ vscode/extension/src/lsp/lsp.ts | 16 --- 2 files changed, 156 insertions(+), 47 deletions(-) diff --git a/sqlmesh/lsp/main.py b/sqlmesh/lsp/main.py index 2a3e18c6ea..66aef3007f 100755 --- a/sqlmesh/lsp/main.py +++ b/sqlmesh/lsp/main.py @@ -2,6 +2,7 @@ """A Language Server Protocol (LSP) server for SQL with SQLMesh integration, refactored without globals.""" from itertools import chain +import asyncio import logging import typing as t from pathlib import Path @@ -205,7 +206,7 @@ def function_call(ls: LanguageServer, params: t.Any) -> t.Dict[str, t.Any]: self.server.feature(name)(create_function_call(method)) @self.server.feature(types.INITIALIZE) - def initialize(ls: LanguageServer, params: types.InitializeParams) -> None: + async def initialize(ls: LanguageServer, params: types.InitializeParams) -> None: """Initialize the server when the client connects.""" try: # Check if the client supports pull diagnostics @@ -232,7 +233,7 @@ def initialize(ls: LanguageServer, params: types.InitializeParams) -> None: for ext in ("py", "yml", "yaml"): config_path = folder_path / f"config.{ext}" if config_path.exists(): - if self._create_lsp_context([folder_path]): + if await self._create_lsp_context([folder_path]): loaded_sqlmesh_message(ls, folder_path) return # Exit after successfully loading any config except Exception as e: @@ -254,14 +255,60 @@ def did_open(ls: LanguageServer, params: types.DidOpenTextDocumentParams) -> Non ) @self.server.feature(types.TEXT_DOCUMENT_DID_SAVE) - def did_save(ls: LanguageServer, params: types.DidSaveTextDocumentParams) -> None: + async def did_save(ls: LanguageServer, params: types.DidSaveTextDocumentParams) -> None: uri = URI(params.text_document.uri) if self.lsp_context is None: return context = self.lsp_context.context - context.load() - self.lsp_context = LSPContext(context) + retry_count = 1 # Single retry for save operations to avoid blocking UI + last_error = None + + for attempt in range(retry_count): + try: + # Run context.load() in a separate thread with asyncio timeout + loop = asyncio.get_event_loop() + + # Run the blocking operation in a thread pool + try: + await asyncio.wait_for( + loop.run_in_executor(None, context.load), timeout=30.0 + ) + self.lsp_context = LSPContext(context) + break # Success, exit retry loop + except asyncio.TimeoutError: + ls.log_trace( + f"Context.load() timed out after 30 seconds (attempt {attempt + 1}/{retry_count})" + ) + if attempt < retry_count - 1: + # Wait before retrying + wait_time = 2**attempt # 1s, 2s + ls.log_trace(f"Retrying in {wait_time} seconds...") + await asyncio.sleep(wait_time) + continue + else: + ls.show_message( + "SQLMesh: Model reload timed out after multiple attempts. The LSP server will continue with the previous state.", + types.MessageType.Warning, + ) + return + except Exception as e: + last_error = e + ls.log_trace( + f"Error reloading context (attempt {attempt + 1}/{retry_count}): {e}" + ) + + if attempt < retry_count - 1: + # Wait before retrying + wait_time = 2**attempt # 1s, 2s + ls.log_trace(f"Retrying in {wait_time} seconds...") + await asyncio.sleep(wait_time) + else: + ls.show_message( + f"SQLMesh: Error reloading models after {retry_count} attempts: {str(last_error)}. The LSP server will continue with the previous state.", + types.MessageType.Warning, + ) + return # Only publish diagnostics if client doesn't support pull diagnostics if not self.client_supports_pull_diagnostics: @@ -616,13 +663,37 @@ def _get_diagnostics_for_uri(self, uri: URI) -> t.Tuple[t.List[types.Diagnostic] return [], 0 def _context_get_or_load(self, document_uri: t.Optional[URI] = None) -> LSPContext: + """Synchronous wrapper for async context loading. + + Always attempts to create context if it doesn't exist, regardless of previous failures. + """ + if self.lsp_context is None: + # Always try to load context when it's needed + self.server.log_trace("Context not loaded, attempting to create...") + # Run the async method in a new event loop if needed + import asyncio + + try: + loop = asyncio.get_running_loop() + # We're already in an async context, can't use asyncio.run + raise RuntimeError("Cannot load context synchronously from async context") + except RuntimeError: + # No running loop, we can use asyncio.run + asyncio.run(self._ensure_context_for_document(document_uri)) + if self.lsp_context is None: + # If we still don't have a context after trying to load, raise an error + # But don't prevent future attempts + raise RuntimeError("No context found able to get or load") + return self.lsp_context + + async def _context_get_or_load_async(self, document_uri: t.Optional[URI] = None) -> LSPContext: if self.lsp_context is None: - self._ensure_context_for_document(document_uri) + await self._ensure_context_for_document(document_uri) if self.lsp_context is None: raise RuntimeError("No context found able to get or load") return self.lsp_context - def _ensure_context_for_document( + async def _ensure_context_for_document( self, document_uri: t.Optional[URI] = None, ) -> None: @@ -635,12 +706,12 @@ def _ensure_context_for_document( if document_path.is_file() and document_path.suffix in (".sql", ".py"): document_folder = document_path.parent if document_folder.is_dir(): - self._ensure_context_in_folder(document_folder) + await self._ensure_context_in_folder(document_folder) return - return self._ensure_context_in_folder() + return await self._ensure_context_in_folder() - def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> None: + async def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> None: if self.lsp_context is not None: return @@ -649,7 +720,7 @@ def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> Non for ext in ("py", "yml", "yaml"): config_path = workspace_folder / f"config.{ext}" if config_path.exists(): - if self._create_lsp_context([workspace_folder]): + if await self._create_lsp_context([workspace_folder]): return # Then , check the provided folder recursively @@ -660,7 +731,7 @@ def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> Non for ext in ("py", "yml", "yaml"): config_path = path / f"config.{ext}" if config_path.exists(): - if self._create_lsp_context([path]): + if await self._create_lsp_context([path]): return path = path.parent @@ -672,37 +743,91 @@ def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> Non + (f" or in {folder_path}" if folder_path else "") ) - def _create_lsp_context(self, paths: t.List[Path]) -> t.Optional[LSPContext]: + async def _create_lsp_context( + self, paths: t.List[Path], retry_count: int = 1 + ) -> t.Optional[LSPContext]: """Create a new LSPContext instance using the configured context class. On success, sets self.lsp_context and returns the created context. Args: paths: List of paths to pass to the context constructor + retry_count: Number of times to retry on failure (default: 1) Returns: A new LSPContext instance wrapping the created context, or None if creation fails """ - try: - if self.lsp_context is None: - context = self.context_class(paths=paths) - loaded_sqlmesh_message(self.server, paths[0]) - else: - self.lsp_context.context.load() - context = self.lsp_context.context - self.lsp_context = LSPContext(context) - return self.lsp_context - except Exception as e: - # Only show the error message once - if not self.has_raised_loading_error: - self.server.show_message( - f"Error creating context: {e}", - types.MessageType.Error, + # Always attempt to create context when requested + if self.has_raised_loading_error: + self.server.log_trace("Retrying context creation after previous failure...") + # Give a small delay to allow file system changes to be registered + await asyncio.sleep(0.5) + + last_error = None + + for attempt in range(retry_count): + try: + if self.lsp_context is None: + context = self.context_class(paths=paths) + # Show success message, currently also showing recovering from error + if self.has_raised_loading_error: + self.server.show_message( + f"Successfully loaded SQLMesh context from {paths[0]} (recovered from previous error)", + types.MessageType.Info, + ) + loaded_sqlmesh_message(self.server, paths[0]) + # Reset error flag on successful load + self.has_raised_loading_error = False + else: + # Run context.load() with asyncio timeout + loop = asyncio.get_event_loop() + + try: + await asyncio.wait_for( + loop.run_in_executor(None, self.lsp_context.context.load), timeout=30.0 + ) + context = self.lsp_context.context + # Reset error flag on successful load + self.has_raised_loading_error = False + except asyncio.TimeoutError: + self.server.log_trace( + f"Context.load() timed out after 30 seconds in _create_lsp_context (attempt {attempt + 1}/{retry_count})" + ) + if attempt < retry_count - 1: + # Wait before retrying (exponential backoff (1s, 2s, 4s)) + wait_time = 2**attempt + self.server.log_trace(f"Retrying in {wait_time} seconds...") + await asyncio.sleep(wait_time) + continue + return None + + self.lsp_context = LSPContext(context) + return self.lsp_context + + except Exception as e: + last_error = e + self.server.log_trace( + f"Error creating context (attempt {attempt + 1}/{retry_count}): {e}" ) - self.has_raised_loading_error = True - self.server.log_trace(f"Error creating context: {e}") - return None + if attempt < retry_count - 1: + # Wait before retrying (exponential backoff) + wait_time = 2**attempt # 1s, 2s, 4s + self.server.log_trace(f"Retrying in {wait_time} seconds...") + await asyncio.sleep(wait_time) + else: + # Show error message only if we haven't shown it recently + if not self.has_raised_loading_error: + self.server.show_message( + f"Error creating context: {last_error}", + types.MessageType.Error, + ) + self.has_raised_loading_error = True + else: + # Log the error but don't show notification to avoid spamming + self.server.log_trace(f"Context creation failed again: {last_error}") + + return None @staticmethod def _diagnostic_to_lsp_diagnostic( diff --git a/vscode/extension/src/lsp/lsp.ts b/vscode/extension/src/lsp/lsp.ts index 9432762aed..a1ad08a864 100644 --- a/vscode/extension/src/lsp/lsp.ts +++ b/vscode/extension/src/lsp/lsp.ts @@ -81,14 +81,6 @@ export class LSPClient implements Disposable { transport: TransportKind.stdio, options: { cwd: workspacePath, - // TODO: This is a temporary fix to avoid the issue with the LSP server - // crashing when the number of workers is too high. This is a workaround - // to avoid the issue. Once fixed, we should remove the whole env block. - env: { - MAX_FORK_WORKERS: '1', - ...process.env, - ...sqlmesh.value.env, - }, }, args: sqlmesh.value.args, }, @@ -97,14 +89,6 @@ export class LSPClient implements Disposable { transport: TransportKind.stdio, options: { cwd: workspacePath, - env: { - // TODO: This is a temporary fix to avoid the issue with the LSP server - // crashing when the number of workers is too high. This is a workaround - // to avoid the issue. Once fixed, we should remove the whole env block. - MAX_FORK_WORKERS: '1', - ...process.env, - ...sqlmesh.value.env, - }, }, args: sqlmesh.value.args, },