Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 156 additions & 31 deletions sqlmesh/lsp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""A Language Server Protocol (LSP) server for SQL with SQLMesh integration, refactored without globals."""

from itertools import chain
import asyncio
import logging
import typing as t
from pathlib import Path
Expand Down Expand Up @@ -205,7 +206,7 @@ def function_call(ls: LanguageServer, params: t.Any) -> t.Dict[str, t.Any]:
self.server.feature(name)(create_function_call(method))

@self.server.feature(types.INITIALIZE)
def initialize(ls: LanguageServer, params: types.InitializeParams) -> None:
async def initialize(ls: LanguageServer, params: types.InitializeParams) -> None:
"""Initialize the server when the client connects."""
try:
# Check if the client supports pull diagnostics
Expand All @@ -232,7 +233,7 @@ def initialize(ls: LanguageServer, params: types.InitializeParams) -> None:
for ext in ("py", "yml", "yaml"):
config_path = folder_path / f"config.{ext}"
if config_path.exists():
if self._create_lsp_context([folder_path]):
if await self._create_lsp_context([folder_path]):
loaded_sqlmesh_message(ls, folder_path)
return # Exit after successfully loading any config
except Exception as e:
Expand All @@ -254,14 +255,60 @@ def did_open(ls: LanguageServer, params: types.DidOpenTextDocumentParams) -> Non
)

@self.server.feature(types.TEXT_DOCUMENT_DID_SAVE)
def did_save(ls: LanguageServer, params: types.DidSaveTextDocumentParams) -> None:
async def did_save(ls: LanguageServer, params: types.DidSaveTextDocumentParams) -> None:
uri = URI(params.text_document.uri)
if self.lsp_context is None:
return

context = self.lsp_context.context
context.load()
self.lsp_context = LSPContext(context)
retry_count = 1 # Single retry for save operations to avoid blocking UI
last_error = None

for attempt in range(retry_count):
try:
# Run context.load() in a separate thread with asyncio timeout
loop = asyncio.get_event_loop()

# Run the blocking operation in a thread pool
try:
await asyncio.wait_for(
loop.run_in_executor(None, context.load), timeout=30.0
)
self.lsp_context = LSPContext(context)
break # Success, exit retry loop
except asyncio.TimeoutError:
ls.log_trace(
f"Context.load() timed out after 30 seconds (attempt {attempt + 1}/{retry_count})"
)
if attempt < retry_count - 1:
# Wait before retrying
wait_time = 2**attempt # 1s, 2s
ls.log_trace(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
continue
else:
ls.show_message(
"SQLMesh: Model reload timed out after multiple attempts. The LSP server will continue with the previous state.",
types.MessageType.Warning,
)
return
except Exception as e:
last_error = e
ls.log_trace(
f"Error reloading context (attempt {attempt + 1}/{retry_count}): {e}"
)

if attempt < retry_count - 1:
# Wait before retrying
wait_time = 2**attempt # 1s, 2s
ls.log_trace(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
ls.show_message(
f"SQLMesh: Error reloading models after {retry_count} attempts: {str(last_error)}. The LSP server will continue with the previous state.",
types.MessageType.Warning,
)
return

# Only publish diagnostics if client doesn't support pull diagnostics
if not self.client_supports_pull_diagnostics:
Expand Down Expand Up @@ -616,13 +663,37 @@ def _get_diagnostics_for_uri(self, uri: URI) -> t.Tuple[t.List[types.Diagnostic]
return [], 0

def _context_get_or_load(self, document_uri: t.Optional[URI] = None) -> LSPContext:
"""Synchronous wrapper for async context loading.

Always attempts to create context if it doesn't exist, regardless of previous failures.
"""
if self.lsp_context is None:
# Always try to load context when it's needed
self.server.log_trace("Context not loaded, attempting to create...")
# Run the async method in a new event loop if needed
import asyncio

try:
loop = asyncio.get_running_loop()
# We're already in an async context, can't use asyncio.run
raise RuntimeError("Cannot load context synchronously from async context")
except RuntimeError:
# No running loop, we can use asyncio.run
asyncio.run(self._ensure_context_for_document(document_uri))
if self.lsp_context is None:
# If we still don't have a context after trying to load, raise an error
# But don't prevent future attempts
raise RuntimeError("No context found able to get or load")
return self.lsp_context

async def _context_get_or_load_async(self, document_uri: t.Optional[URI] = None) -> LSPContext:
if self.lsp_context is None:
self._ensure_context_for_document(document_uri)
await self._ensure_context_for_document(document_uri)
if self.lsp_context is None:
raise RuntimeError("No context found able to get or load")
return self.lsp_context

def _ensure_context_for_document(
async def _ensure_context_for_document(
self,
document_uri: t.Optional[URI] = None,
) -> None:
Expand All @@ -635,12 +706,12 @@ def _ensure_context_for_document(
if document_path.is_file() and document_path.suffix in (".sql", ".py"):
document_folder = document_path.parent
if document_folder.is_dir():
self._ensure_context_in_folder(document_folder)
await self._ensure_context_in_folder(document_folder)
return

return self._ensure_context_in_folder()
return await self._ensure_context_in_folder()

def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> None:
async def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> None:
if self.lsp_context is not None:
return

Expand All @@ -649,7 +720,7 @@ def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> Non
for ext in ("py", "yml", "yaml"):
config_path = workspace_folder / f"config.{ext}"
if config_path.exists():
if self._create_lsp_context([workspace_folder]):
if await self._create_lsp_context([workspace_folder]):
return

# Then , check the provided folder recursively
Expand All @@ -660,7 +731,7 @@ def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> Non
for ext in ("py", "yml", "yaml"):
config_path = path / f"config.{ext}"
if config_path.exists():
if self._create_lsp_context([path]):
if await self._create_lsp_context([path]):
return

path = path.parent
Expand All @@ -672,37 +743,91 @@ def _ensure_context_in_folder(self, folder_path: t.Optional[Path] = None) -> Non
+ (f" or in {folder_path}" if folder_path else "")
)

def _create_lsp_context(self, paths: t.List[Path]) -> t.Optional[LSPContext]:
async def _create_lsp_context(
self, paths: t.List[Path], retry_count: int = 1
) -> t.Optional[LSPContext]:
"""Create a new LSPContext instance using the configured context class.

On success, sets self.lsp_context and returns the created context.

Args:
paths: List of paths to pass to the context constructor
retry_count: Number of times to retry on failure (default: 1)

Returns:
A new LSPContext instance wrapping the created context, or None if creation fails
"""
try:
if self.lsp_context is None:
context = self.context_class(paths=paths)
loaded_sqlmesh_message(self.server, paths[0])
else:
self.lsp_context.context.load()
context = self.lsp_context.context
self.lsp_context = LSPContext(context)
return self.lsp_context
except Exception as e:
# Only show the error message once
if not self.has_raised_loading_error:
self.server.show_message(
f"Error creating context: {e}",
types.MessageType.Error,
# Always attempt to create context when requested
if self.has_raised_loading_error:
self.server.log_trace("Retrying context creation after previous failure...")
# Give a small delay to allow file system changes to be registered
await asyncio.sleep(0.5)

last_error = None

for attempt in range(retry_count):
try:
if self.lsp_context is None:
context = self.context_class(paths=paths)
# Show success message, currently also showing recovering from error
if self.has_raised_loading_error:
self.server.show_message(
f"Successfully loaded SQLMesh context from {paths[0]} (recovered from previous error)",
types.MessageType.Info,
)
loaded_sqlmesh_message(self.server, paths[0])
# Reset error flag on successful load
self.has_raised_loading_error = False
else:
# Run context.load() with asyncio timeout
loop = asyncio.get_event_loop()

try:
await asyncio.wait_for(
loop.run_in_executor(None, self.lsp_context.context.load), timeout=30.0
)
context = self.lsp_context.context
# Reset error flag on successful load
self.has_raised_loading_error = False
except asyncio.TimeoutError:
self.server.log_trace(
f"Context.load() timed out after 30 seconds in _create_lsp_context (attempt {attempt + 1}/{retry_count})"
)
if attempt < retry_count - 1:
# Wait before retrying (exponential backoff (1s, 2s, 4s))
wait_time = 2**attempt
self.server.log_trace(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
continue
return None

self.lsp_context = LSPContext(context)
return self.lsp_context

except Exception as e:
last_error = e
self.server.log_trace(
f"Error creating context (attempt {attempt + 1}/{retry_count}): {e}"
)
self.has_raised_loading_error = True

self.server.log_trace(f"Error creating context: {e}")
return None
if attempt < retry_count - 1:
# Wait before retrying (exponential backoff)
wait_time = 2**attempt # 1s, 2s, 4s
self.server.log_trace(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
# Show error message only if we haven't shown it recently
if not self.has_raised_loading_error:
self.server.show_message(
f"Error creating context: {last_error}",
types.MessageType.Error,
)
self.has_raised_loading_error = True
else:
# Log the error but don't show notification to avoid spamming
self.server.log_trace(f"Context creation failed again: {last_error}")

return None

@staticmethod
def _diagnostic_to_lsp_diagnostic(
Expand Down
16 changes: 0 additions & 16 deletions vscode/extension/src/lsp/lsp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,6 @@ export class LSPClient implements Disposable {
transport: TransportKind.stdio,
options: {
cwd: workspacePath,
// TODO: This is a temporary fix to avoid the issue with the LSP server
// crashing when the number of workers is too high. This is a workaround
// to avoid the issue. Once fixed, we should remove the whole env block.
env: {
MAX_FORK_WORKERS: '1',
...process.env,
...sqlmesh.value.env,
},
},
args: sqlmesh.value.args,
},
Expand All @@ -97,14 +89,6 @@ export class LSPClient implements Disposable {
transport: TransportKind.stdio,
options: {
cwd: workspacePath,
env: {
// TODO: This is a temporary fix to avoid the issue with the LSP server
// crashing when the number of workers is too high. This is a workaround
// to avoid the issue. Once fixed, we should remove the whole env block.
MAX_FORK_WORKERS: '1',
...process.env,
...sqlmesh.value.env,
},
},
args: sqlmesh.value.args,
},
Expand Down