LSP: Remove future wrapper from Client::notify, Client::reply

Previously LSP notifications were sent within a future and most callers
used a `tokio::spawn` to send the notification, silently discarding any
failures like problems serializing parameters or sending on the channel.
It's possible that tokio could schedule futures out of intended order
though which could cause notifications where order is important, like
document synchronization, to become partially shuffled. This change
removes the future wrapper and logs all internal failures.

Also included in this commit is the same change for `Client::reply`
which was also unnecessarily wrapped in a future.

Co-authored-by: Pascal Kuthe <pascalkuthe@pm.me>
This commit is contained in:
Michael Davis 2025-02-01 12:10:42 -05:00
parent 0ea401d2d7
commit 5532ef35d9
No known key found for this signature in database
6 changed files with 93 additions and 122 deletions

View file

@ -170,7 +170,7 @@ impl Client {
// and that we can therefore reuse the client (but are done now) // and that we can therefore reuse the client (but are done now)
return; return;
} }
tokio::spawn(self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new())); self.did_change_workspace(vec![workspace_for_uri(root_uri)], Vec::new())
} }
#[allow(clippy::type_complexity, clippy::too_many_arguments)] #[allow(clippy::type_complexity, clippy::too_many_arguments)]
@ -456,17 +456,23 @@ impl Client {
} }
/// Send a RPC notification to the language server. /// Send a RPC notification to the language server.
pub fn notify<R: lsp::notification::Notification>( pub fn notify<R: lsp::notification::Notification>(&self, params: R::Params)
&self,
params: R::Params,
) -> impl Future<Output = Result<()>>
where where
R::Params: serde::Serialize, R::Params: serde::Serialize,
{ {
let server_tx = self.server_tx.clone(); let server_tx = self.server_tx.clone();
async move { let params = match serde_json::to_value(params) {
let params = serde_json::to_value(params)?; Ok(params) => params,
Err(err) => {
log::error!(
"Failed to serialize params for notification '{}' for server '{}': {err}",
R::METHOD,
self.name,
);
return;
}
};
let notification = jsonrpc::Notification { let notification = jsonrpc::Notification {
jsonrpc: Some(jsonrpc::Version::V2), jsonrpc: Some(jsonrpc::Version::V2),
@ -474,11 +480,12 @@ impl Client {
params: Self::value_into_params(params), params: Self::value_into_params(params),
}; };
server_tx if let Err(err) = server_tx.send(Payload::Notification(notification)) {
.send(Payload::Notification(notification)) log::error!(
.map_err(|e| Error::Other(e.into()))?; "Failed to send notification '{}' to server '{}': {err}",
R::METHOD,
Ok(()) self.name
);
} }
} }
@ -487,17 +494,16 @@ impl Client {
&self, &self,
id: jsonrpc::Id, id: jsonrpc::Id,
result: core::result::Result<Value, jsonrpc::Error>, result: core::result::Result<Value, jsonrpc::Error>,
) -> impl Future<Output = Result<()>> { ) -> Result<()> {
use jsonrpc::{Failure, Output, Success, Version}; use jsonrpc::{Failure, Output, Success, Version};
let server_tx = self.server_tx.clone(); let server_tx = self.server_tx.clone();
async move {
let output = match result { let output = match result {
Ok(result) => Output::Success(Success { Ok(result) => Output::Success(Success {
jsonrpc: Some(Version::V2), jsonrpc: Some(Version::V2),
id, id,
result: serde_json::to_value(result)?, result,
}), }),
Err(error) => Output::Failure(Failure { Err(error) => Output::Failure(Failure {
jsonrpc: Some(Version::V2), jsonrpc: Some(Version::V2),
@ -512,7 +518,6 @@ impl Client {
Ok(()) Ok(())
} }
}
// ------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------
// General messages // General messages
@ -693,7 +698,7 @@ impl Client {
self.request::<lsp::request::Shutdown>(()).await self.request::<lsp::request::Shutdown>(()).await
} }
pub fn exit(&self) -> impl Future<Output = Result<()>> { pub fn exit(&self) {
self.notify::<lsp::notification::Exit>(()) self.notify::<lsp::notification::Exit>(())
} }
@ -701,7 +706,8 @@ impl Client {
/// early if server responds with an error. /// early if server responds with an error.
pub async fn shutdown_and_exit(&self) -> Result<()> { pub async fn shutdown_and_exit(&self) -> Result<()> {
self.shutdown().await?; self.shutdown().await?;
self.exit().await self.exit();
Ok(())
} }
/// Forcefully shuts down the language server ignoring any errors. /// Forcefully shuts down the language server ignoring any errors.
@ -709,24 +715,21 @@ impl Client {
if let Err(e) = self.shutdown().await { if let Err(e) = self.shutdown().await {
log::warn!("language server failed to terminate gracefully - {}", e); log::warn!("language server failed to terminate gracefully - {}", e);
} }
self.exit().await self.exit();
Ok(())
} }
// ------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------
// Workspace // Workspace
// ------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------
pub fn did_change_configuration(&self, settings: Value) -> impl Future<Output = Result<()>> { pub fn did_change_configuration(&self, settings: Value) {
self.notify::<lsp::notification::DidChangeConfiguration>( self.notify::<lsp::notification::DidChangeConfiguration>(
lsp::DidChangeConfigurationParams { settings }, lsp::DidChangeConfigurationParams { settings },
) )
} }
pub fn did_change_workspace( pub fn did_change_workspace(&self, added: Vec<WorkspaceFolder>, removed: Vec<WorkspaceFolder>) {
&self,
added: Vec<WorkspaceFolder>,
removed: Vec<WorkspaceFolder>,
) -> impl Future<Output = Result<()>> {
self.notify::<DidChangeWorkspaceFolders>(DidChangeWorkspaceFoldersParams { self.notify::<DidChangeWorkspaceFolders>(DidChangeWorkspaceFoldersParams {
event: WorkspaceFoldersChangeEvent { added, removed }, event: WorkspaceFoldersChangeEvent { added, removed },
}) })
@ -766,12 +769,7 @@ impl Client {
}) })
} }
pub fn did_rename( pub fn did_rename(&self, old_path: &Path, new_path: &Path, is_dir: bool) -> Option<()> {
&self,
old_path: &Path,
new_path: &Path,
is_dir: bool,
) -> Option<impl Future<Output = std::result::Result<(), Error>>> {
let capabilities = self.file_operations_intests(); let capabilities = self.file_operations_intests();
if !capabilities.did_rename.has_interest(new_path, is_dir) { if !capabilities.did_rename.has_interest(new_path, is_dir) {
return None; return None;
@ -789,7 +787,8 @@ impl Client {
old_uri: url_from_path(old_path)?, old_uri: url_from_path(old_path)?,
new_uri: url_from_path(new_path)?, new_uri: url_from_path(new_path)?,
}]; }];
Some(self.notify::<lsp::notification::DidRenameFiles>(lsp::RenameFilesParams { files })) self.notify::<lsp::notification::DidRenameFiles>(lsp::RenameFilesParams { files });
Some(())
} }
// ------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------
@ -802,7 +801,7 @@ impl Client {
version: i32, version: i32,
doc: &Rope, doc: &Rope,
language_id: String, language_id: String,
) -> impl Future<Output = Result<()>> { ) {
self.notify::<lsp::notification::DidOpenTextDocument>(lsp::DidOpenTextDocumentParams { self.notify::<lsp::notification::DidOpenTextDocument>(lsp::DidOpenTextDocumentParams {
text_document: lsp::TextDocumentItem { text_document: lsp::TextDocumentItem {
uri, uri,
@ -929,7 +928,7 @@ impl Client {
old_text: &Rope, old_text: &Rope,
new_text: &Rope, new_text: &Rope,
changes: &ChangeSet, changes: &ChangeSet,
) -> Option<impl Future<Output = Result<()>>> { ) -> Option<()> {
let capabilities = self.capabilities.get().unwrap(); let capabilities = self.capabilities.get().unwrap();
// Return early if the server does not support document sync. // Return early if the server does not support document sync.
@ -961,18 +960,14 @@ impl Client {
kind => unimplemented!("{:?}", kind), kind => unimplemented!("{:?}", kind),
}; };
Some(self.notify::<lsp::notification::DidChangeTextDocument>( self.notify::<lsp::notification::DidChangeTextDocument>(lsp::DidChangeTextDocumentParams {
lsp::DidChangeTextDocumentParams {
text_document, text_document,
content_changes: changes, content_changes: changes,
}, });
)) Some(())
} }
pub fn text_document_did_close( pub fn text_document_did_close(&self, text_document: lsp::TextDocumentIdentifier) {
&self,
text_document: lsp::TextDocumentIdentifier,
) -> impl Future<Output = Result<()>> {
self.notify::<lsp::notification::DidCloseTextDocument>(lsp::DidCloseTextDocumentParams { self.notify::<lsp::notification::DidCloseTextDocument>(lsp::DidCloseTextDocumentParams {
text_document, text_document,
}) })
@ -984,7 +979,7 @@ impl Client {
&self, &self,
text_document: lsp::TextDocumentIdentifier, text_document: lsp::TextDocumentIdentifier,
text: &Rope, text: &Rope,
) -> Option<impl Future<Output = Result<()>>> { ) -> Option<()> {
let capabilities = self.capabilities.get().unwrap(); let capabilities = self.capabilities.get().unwrap();
let include_text = match &capabilities.text_document_sync.as_ref()? { let include_text = match &capabilities.text_document_sync.as_ref()? {
@ -1002,12 +997,11 @@ impl Client {
lsp::TextDocumentSyncCapability::Kind(..) => false, lsp::TextDocumentSyncCapability::Kind(..) => false,
}; };
Some(self.notify::<lsp::notification::DidSaveTextDocument>( self.notify::<lsp::notification::DidSaveTextDocument>(lsp::DidSaveTextDocumentParams {
lsp::DidSaveTextDocumentParams {
text_document, text_document,
text: include_text.then_some(text.into()), text: include_text.then_some(text.into()),
}, });
)) Some(())
} }
pub fn completion( pub fn completion(
@ -1540,10 +1534,7 @@ impl Client {
Some(self.call::<lsp::request::ExecuteCommand>(params)) Some(self.call::<lsp::request::ExecuteCommand>(params))
} }
pub fn did_change_watched_files( pub fn did_change_watched_files(&self, changes: Vec<lsp::FileEvent>) {
&self,
changes: Vec<lsp::FileEvent>,
) -> impl Future<Output = std::result::Result<(), Error>> {
self.notify::<lsp::notification::DidChangeWatchedFiles>(lsp::DidChangeWatchedFilesParams { self.notify::<lsp::notification::DidChangeWatchedFiles>(lsp::DidChangeWatchedFilesParams {
changes, changes,
}) })

View file

@ -113,17 +113,13 @@ impl Handler {
"Sending didChangeWatchedFiles notification to client '{}'", "Sending didChangeWatchedFiles notification to client '{}'",
client.name() client.name()
); );
if let Err(err) = crate::block_on(client client.did_change_watched_files(vec![lsp::FileEvent {
.did_change_watched_files(vec![lsp::FileEvent {
uri, uri,
// We currently always send the CHANGED state // We currently always send the CHANGED state
// since we don't actually have more context at // since we don't actually have more context at
// the moment. // the moment.
typ: lsp::FileChangeType::CHANGED, typ: lsp::FileChangeType::CHANGED,
}])) }]);
{
log::warn!("Failed to send didChangeWatchedFiles notification to client: {err}");
}
true true
}); });
} }

View file

@ -900,17 +900,7 @@ fn start_client(
} }
// next up, notify<initialized> // next up, notify<initialized>
let notification_result = _client _client.notify::<lsp::notification::Initialized>(lsp::InitializedParams {});
.notify::<lsp::notification::Initialized>(lsp::InitializedParams {})
.await;
if let Err(e) = notification_result {
log::error!(
"failed to notify language server of its initialization: {}",
e
);
return;
}
initialize_notify.notify_one(); initialize_notify.notify_one();
}); });

View file

@ -722,7 +722,7 @@ impl Application {
// This might not be required by the spec but Neovim does this as well, so it's // This might not be required by the spec but Neovim does this as well, so it's
// probably a good idea for compatibility. // probably a good idea for compatibility.
if let Some(config) = language_server.config() { if let Some(config) = language_server.config() {
tokio::spawn(language_server.did_change_configuration(config.clone())); language_server.did_change_configuration(config.clone());
} }
let docs = self let docs = self
@ -740,12 +740,12 @@ impl Application {
let language_id = let language_id =
doc.language_id().map(ToOwned::to_owned).unwrap_or_default(); doc.language_id().map(ToOwned::to_owned).unwrap_or_default();
tokio::spawn(language_server.text_document_did_open( language_server.text_document_did_open(
url, url,
doc.version(), doc.version(),
doc.text(), doc.text(),
language_id, language_id,
)); );
} }
} }
Notification::PublishDiagnostics(mut params) => { Notification::PublishDiagnostics(mut params) => {
@ -1131,7 +1131,13 @@ impl Application {
} }
}; };
tokio::spawn(language_server!().reply(id, reply)); let language_server = language_server!();
if let Err(err) = language_server.reply(id.clone(), reply) {
log::error!(
"Failed to send reply to server '{}' request {id}: {err}",
language_server.name()
);
}
} }
Call::Invalid { id } => log::error!("LSP invalid method call id={:?}", id), Call::Invalid { id } => log::error!("LSP invalid method call id={:?}", id),
} }

View file

@ -1056,13 +1056,8 @@ impl Document {
if !language_server.is_initialized() { if !language_server.is_initialized() {
continue; continue;
} }
if let Some(notification) = identifier if let Some(id) = identifier.clone() {
.clone() language_server.text_document_did_save(id, &text);
.and_then(|id| language_server.text_document_did_save(id, &text))
{
if let Err(err) = notification.await {
log::error!("Failed to send textDocument/didSave: {err}");
}
} }
} }
@ -1462,16 +1457,12 @@ impl Document {
// TODO: move to hook // TODO: move to hook
// emit lsp notification // emit lsp notification
for language_server in self.language_servers() { for language_server in self.language_servers() {
let notify = language_server.text_document_did_change( let _ = language_server.text_document_did_change(
self.versioned_identifier(), self.versioned_identifier(),
&old_doc, &old_doc,
self.text(), self.text(),
changes, changes,
); );
if let Some(notify) = notify {
tokio::spawn(notify);
}
} }
} }

View file

@ -1416,9 +1416,7 @@ impl Editor {
if !ls.is_initialized() { if !ls.is_initialized() {
continue; continue;
} }
if let Some(notification) = ls.did_rename(old_path, &new_path, is_dir) { ls.did_rename(old_path, &new_path, is_dir);
tokio::spawn(notification);
};
} }
self.language_servers self.language_servers
.file_event_handler .file_event_handler
@ -1441,7 +1439,7 @@ impl Editor {
} }
// if we are open in LSPs send did_close notification // if we are open in LSPs send did_close notification
for language_server in doc.language_servers() { for language_server in doc.language_servers() {
tokio::spawn(language_server.text_document_did_close(doc.identifier())); language_server.text_document_did_close(doc.identifier());
} }
} }
// we need to clear the list of language servers here so that // we need to clear the list of language servers here so that
@ -1522,7 +1520,7 @@ impl Editor {
}); });
for (_, language_server) in doc_language_servers_not_in_registry { for (_, language_server) in doc_language_servers_not_in_registry {
tokio::spawn(language_server.text_document_did_close(doc.identifier())); language_server.text_document_did_close(doc.identifier());
} }
let language_servers_not_in_doc = language_servers.iter().filter(|(name, ls)| { let language_servers_not_in_doc = language_servers.iter().filter(|(name, ls)| {
@ -1533,12 +1531,12 @@ impl Editor {
for (_, language_server) in language_servers_not_in_doc { for (_, language_server) in language_servers_not_in_doc {
// TODO: this now races with on_init code if the init happens too quickly // TODO: this now races with on_init code if the init happens too quickly
tokio::spawn(language_server.text_document_did_open( language_server.text_document_did_open(
doc_url.clone(), doc_url.clone(),
doc.version(), doc.version(),
doc.text(), doc.text(),
language_id.clone(), language_id.clone(),
)); );
} }
doc.language_servers = language_servers; doc.language_servers = language_servers;
@ -1797,8 +1795,7 @@ impl Editor {
self.saves.remove(&doc_id); self.saves.remove(&doc_id);
for language_server in doc.language_servers() { for language_server in doc.language_servers() {
// TODO: track error language_server.text_document_did_close(doc.identifier());
tokio::spawn(language_server.text_document_did_close(doc.identifier()));
} }
enum Action { enum Action {