From 5532ef35d9750c72f6cf11ca95dad00641f29ca9 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Sat, 1 Feb 2025 12:10:42 -0500 Subject: [PATCH] LSP: Remove future wrapper from `Client::notify`, `Client::reply` Previously LSP notifications were sent within a future and most callers used a `tokio::spawn` to send the notification, silently discarding any failures like problems serializing parameters or sending on the channel. It's possible that tokio could schedule futures out of intended order though which could cause notifications where order is important, like document synchronization, to become partially shuffled. This change removes the future wrapper and logs all internal failures. Also included in this commit is the same change for `Client::reply` which was also unnecessarily wrapped in a future. Co-authored-by: Pascal Kuthe --- helix-lsp/src/client.rs | 141 ++++++++++++++++------------------ helix-lsp/src/file_event.rs | 18 ++--- helix-lsp/src/lib.rs | 12 +-- helix-term/src/application.rs | 14 +++- helix-view/src/document.rs | 15 +--- helix-view/src/editor.rs | 15 ++-- 6 files changed, 93 insertions(+), 122 deletions(-) diff --git a/helix-lsp/src/client.rs b/helix-lsp/src/client.rs index f0e42ee44..31742804d 100644 --- a/helix-lsp/src/client.rs +++ b/helix-lsp/src/client.rs @@ -170,7 +170,7 @@ impl Client { // and that we can therefore reuse the client (but are done now) return; } - tokio::spawn(self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new())); + self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new()) } #[allow(clippy::type_complexity, clippy::too_many_arguments)] @@ -456,29 +456,36 @@ impl Client { } /// Send a RPC notification to the language server. - pub fn notify( - &self, - params: R::Params, - ) -> impl Future> + pub fn notify(&self, params: R::Params) where R::Params: serde::Serialize, { let server_tx = self.server_tx.clone(); - async move { - let params = serde_json::to_value(params)?; + let params = match serde_json::to_value(params) { + Ok(params) => params, + Err(err) => { + log::error!( + "Failed to serialize params for notification '{}' for server '{}': {err}", + R::METHOD, + self.name, + ); + return; + } + }; - let notification = jsonrpc::Notification { - jsonrpc: Some(jsonrpc::Version::V2), - method: R::METHOD.to_string(), - params: Self::value_into_params(params), - }; + let notification = jsonrpc::Notification { + jsonrpc: Some(jsonrpc::Version::V2), + method: R::METHOD.to_string(), + params: Self::value_into_params(params), + }; - server_tx - .send(Payload::Notification(notification)) - .map_err(|e| Error::Other(e.into()))?; - - Ok(()) + if let Err(err) = server_tx.send(Payload::Notification(notification)) { + log::error!( + "Failed to send notification '{}' to server '{}': {err}", + R::METHOD, + self.name + ); } } @@ -487,31 +494,29 @@ impl Client { &self, id: jsonrpc::Id, result: core::result::Result, - ) -> impl Future> { + ) -> Result<()> { use jsonrpc::{Failure, Output, Success, Version}; let server_tx = self.server_tx.clone(); - async move { - let output = match result { - Ok(result) => Output::Success(Success { - jsonrpc: Some(Version::V2), - id, - result: serde_json::to_value(result)?, - }), - Err(error) => Output::Failure(Failure { - jsonrpc: Some(Version::V2), - id, - error, - }), - }; + let output = match result { + Ok(result) => Output::Success(Success { + jsonrpc: Some(Version::V2), + id, + result, + }), + Err(error) => Output::Failure(Failure { + jsonrpc: Some(Version::V2), + id, + error, + }), + }; - server_tx - .send(Payload::Response(output)) - .map_err(|e| Error::Other(e.into()))?; + server_tx + .send(Payload::Response(output)) + .map_err(|e| Error::Other(e.into()))?; - Ok(()) - } + Ok(()) } // ------------------------------------------------------------------------------------------- @@ -693,7 +698,7 @@ impl Client { self.request::(()).await } - pub fn exit(&self) -> impl Future> { + pub fn exit(&self) { self.notify::(()) } @@ -701,7 +706,8 @@ impl Client { /// early if server responds with an error. pub async fn shutdown_and_exit(&self) -> Result<()> { self.shutdown().await?; - self.exit().await + self.exit(); + Ok(()) } /// Forcefully shuts down the language server ignoring any errors. @@ -709,24 +715,21 @@ impl Client { if let Err(e) = self.shutdown().await { log::warn!("language server failed to terminate gracefully - {}", e); } - self.exit().await + self.exit(); + Ok(()) } // ------------------------------------------------------------------------------------------- // Workspace // ------------------------------------------------------------------------------------------- - pub fn did_change_configuration(&self, settings: Value) -> impl Future> { + pub fn did_change_configuration(&self, settings: Value) { self.notify::( lsp::DidChangeConfigurationParams { settings }, ) } - pub fn did_change_workspace( - &self, - added: Vec, - removed: Vec, - ) -> impl Future> { + pub fn did_change_workspace(&self, added: Vec, removed: Vec) { self.notify::(DidChangeWorkspaceFoldersParams { event: WorkspaceFoldersChangeEvent { added, removed }, }) @@ -766,12 +769,7 @@ impl Client { }) } - pub fn did_rename( - &self, - old_path: &Path, - new_path: &Path, - is_dir: bool, - ) -> Option>> { + pub fn did_rename(&self, old_path: &Path, new_path: &Path, is_dir: bool) -> Option<()> { let capabilities = self.file_operations_intests(); if !capabilities.did_rename.has_interest(new_path, is_dir) { return None; @@ -789,7 +787,8 @@ impl Client { old_uri: url_from_path(old_path)?, new_uri: url_from_path(new_path)?, }]; - Some(self.notify::(lsp::RenameFilesParams { files })) + self.notify::(lsp::RenameFilesParams { files }); + Some(()) } // ------------------------------------------------------------------------------------------- @@ -802,7 +801,7 @@ impl Client { version: i32, doc: &Rope, language_id: String, - ) -> impl Future> { + ) { self.notify::(lsp::DidOpenTextDocumentParams { text_document: lsp::TextDocumentItem { uri, @@ -929,7 +928,7 @@ impl Client { old_text: &Rope, new_text: &Rope, changes: &ChangeSet, - ) -> Option>> { + ) -> Option<()> { let capabilities = self.capabilities.get().unwrap(); // Return early if the server does not support document sync. @@ -961,18 +960,14 @@ impl Client { kind => unimplemented!("{:?}", kind), }; - Some(self.notify::( - lsp::DidChangeTextDocumentParams { - text_document, - content_changes: changes, - }, - )) + self.notify::(lsp::DidChangeTextDocumentParams { + text_document, + content_changes: changes, + }); + Some(()) } - pub fn text_document_did_close( - &self, - text_document: lsp::TextDocumentIdentifier, - ) -> impl Future> { + pub fn text_document_did_close(&self, text_document: lsp::TextDocumentIdentifier) { self.notify::(lsp::DidCloseTextDocumentParams { text_document, }) @@ -984,7 +979,7 @@ impl Client { &self, text_document: lsp::TextDocumentIdentifier, text: &Rope, - ) -> Option>> { + ) -> Option<()> { let capabilities = self.capabilities.get().unwrap(); let include_text = match &capabilities.text_document_sync.as_ref()? { @@ -1002,12 +997,11 @@ impl Client { lsp::TextDocumentSyncCapability::Kind(..) => false, }; - Some(self.notify::( - lsp::DidSaveTextDocumentParams { - text_document, - text: include_text.then_some(text.into()), - }, - )) + self.notify::(lsp::DidSaveTextDocumentParams { + text_document, + text: include_text.then_some(text.into()), + }); + Some(()) } pub fn completion( @@ -1540,10 +1534,7 @@ impl Client { Some(self.call::(params)) } - pub fn did_change_watched_files( - &self, - changes: Vec, - ) -> impl Future> { + pub fn did_change_watched_files(&self, changes: Vec) { self.notify::(lsp::DidChangeWatchedFilesParams { changes, }) diff --git a/helix-lsp/src/file_event.rs b/helix-lsp/src/file_event.rs index c7297d67f..5e7f8ca60 100644 --- a/helix-lsp/src/file_event.rs +++ b/helix-lsp/src/file_event.rs @@ -113,17 +113,13 @@ impl Handler { "Sending didChangeWatchedFiles notification to client '{}'", client.name() ); - if let Err(err) = crate::block_on(client - .did_change_watched_files(vec![lsp::FileEvent { - uri, - // We currently always send the CHANGED state - // since we don't actually have more context at - // the moment. - typ: lsp::FileChangeType::CHANGED, - }])) - { - log::warn!("Failed to send didChangeWatchedFiles notification to client: {err}"); - } + client.did_change_watched_files(vec![lsp::FileEvent { + uri, + // We currently always send the CHANGED state + // since we don't actually have more context at + // the moment. + typ: lsp::FileChangeType::CHANGED, + }]); true }); } diff --git a/helix-lsp/src/lib.rs b/helix-lsp/src/lib.rs index 7ece350a0..5eeea81c2 100644 --- a/helix-lsp/src/lib.rs +++ b/helix-lsp/src/lib.rs @@ -900,17 +900,7 @@ fn start_client( } // next up, notify - let notification_result = _client - .notify::(lsp::InitializedParams {}) - .await; - - if let Err(e) = notification_result { - log::error!( - "failed to notify language server of its initialization: {}", - e - ); - return; - } + _client.notify::(lsp::InitializedParams {}); initialize_notify.notify_one(); }); diff --git a/helix-term/src/application.rs b/helix-term/src/application.rs index 00aa73902..ba74a817a 100644 --- a/helix-term/src/application.rs +++ b/helix-term/src/application.rs @@ -722,7 +722,7 @@ impl Application { // This might not be required by the spec but Neovim does this as well, so it's // probably a good idea for compatibility. if let Some(config) = language_server.config() { - tokio::spawn(language_server.did_change_configuration(config.clone())); + language_server.did_change_configuration(config.clone()); } let docs = self @@ -740,12 +740,12 @@ impl Application { let language_id = doc.language_id().map(ToOwned::to_owned).unwrap_or_default(); - tokio::spawn(language_server.text_document_did_open( + language_server.text_document_did_open( url, doc.version(), doc.text(), language_id, - )); + ); } } Notification::PublishDiagnostics(mut params) => { @@ -1131,7 +1131,13 @@ impl Application { } }; - tokio::spawn(language_server!().reply(id, reply)); + let language_server = language_server!(); + if let Err(err) = language_server.reply(id.clone(), reply) { + log::error!( + "Failed to send reply to server '{}' request {id}: {err}", + language_server.name() + ); + } } Call::Invalid { id } => log::error!("LSP invalid method call id={:?}", id), } diff --git a/helix-view/src/document.rs b/helix-view/src/document.rs index 277b582eb..06a708f05 100644 --- a/helix-view/src/document.rs +++ b/helix-view/src/document.rs @@ -1056,13 +1056,8 @@ impl Document { if !language_server.is_initialized() { continue; } - if let Some(notification) = identifier - .clone() - .and_then(|id| language_server.text_document_did_save(id, &text)) - { - if let Err(err) = notification.await { - log::error!("Failed to send textDocument/didSave: {err}"); - } + if let Some(id) = identifier.clone() { + language_server.text_document_did_save(id, &text); } } @@ -1462,16 +1457,12 @@ impl Document { // TODO: move to hook // emit lsp notification for language_server in self.language_servers() { - let notify = language_server.text_document_did_change( + let _ = language_server.text_document_did_change( self.versioned_identifier(), &old_doc, self.text(), changes, ); - - if let Some(notify) = notify { - tokio::spawn(notify); - } } } diff --git a/helix-view/src/editor.rs b/helix-view/src/editor.rs index 00fe719d9..739dcfb49 100644 --- a/helix-view/src/editor.rs +++ b/helix-view/src/editor.rs @@ -1416,9 +1416,7 @@ impl Editor { if !ls.is_initialized() { continue; } - if let Some(notification) = ls.did_rename(old_path, &new_path, is_dir) { - tokio::spawn(notification); - }; + ls.did_rename(old_path, &new_path, is_dir); } self.language_servers .file_event_handler @@ -1441,7 +1439,7 @@ impl Editor { } // if we are open in LSPs send did_close notification for language_server in doc.language_servers() { - tokio::spawn(language_server.text_document_did_close(doc.identifier())); + language_server.text_document_did_close(doc.identifier()); } } // we need to clear the list of language servers here so that @@ -1522,7 +1520,7 @@ impl Editor { }); for (_, language_server) in doc_language_servers_not_in_registry { - tokio::spawn(language_server.text_document_did_close(doc.identifier())); + language_server.text_document_did_close(doc.identifier()); } let language_servers_not_in_doc = language_servers.iter().filter(|(name, ls)| { @@ -1533,12 +1531,12 @@ impl Editor { for (_, language_server) in language_servers_not_in_doc { // TODO: this now races with on_init code if the init happens too quickly - tokio::spawn(language_server.text_document_did_open( + language_server.text_document_did_open( doc_url.clone(), doc.version(), doc.text(), language_id.clone(), - )); + ); } doc.language_servers = language_servers; @@ -1797,8 +1795,7 @@ impl Editor { self.saves.remove(&doc_id); for language_server in doc.language_servers() { - // TODO: track error - tokio::spawn(language_server.text_document_did_close(doc.identifier())); + language_server.text_document_did_close(doc.identifier()); } enum Action {