mod_smacks: Split resumption into multiple stages, to simplify ISR integration

This will allow us to return the success/failed as part of the SASL2 response,
and *then* perform the stanza sync as a second step.
This commit is contained in:
Matthew Wild 2022-08-26 19:07:36 +01:00
parent a018497a27
commit 6926340d75

View file

@ -107,6 +107,12 @@ local ack_errors = require"util.error".init("mod_smacks", xmlns_sm3, {
overflow = { condition = "resource-constraint", text = "Too many unacked stanzas remaining, session can't be resumed" }
});
local resume_errors = require "util.error".init("mod_smacks", xmlns_sm3, {
expired = { condition = "item-not-found", text = "Session expired, and cannot be resumed" };
already_bound = { condition = "unexpected-request", text = "Cannot resume another session after a resource is bound" };
unknown_session = { condition = "item-not-found", text = "Unknown session" };
});
-- COMPAT note the use of compatibility wrapper in events (queue:table())
local function ack_delayed(session, stanza)
@ -527,13 +533,10 @@ end
module:hook("s2sout-destroyed", handle_s2s_destroyed);
module:hook("s2sin-destroyed", handle_s2s_destroyed);
function handle_resume(session, stanza, xmlns_sm)
function do_resume(session, stanza)
if session.full_jid then
session.log("warn", "Tried to resume after resource binding");
session.send(st.stanza("failed", { xmlns = xmlns_sm })
:tag("unexpected-request", { xmlns = xmlns_errors })
);
return true;
return nil, resume_errors.new("already_bound");
end
local id = stanza.attr.previd;
@ -542,78 +545,94 @@ function handle_resume(session, stanza, xmlns_sm)
local old_session = old_session_registry:get(session.username, id);
if old_session then
session.log("debug", "Tried to resume old expired session with id %s", id);
session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(old_session.h) })
:tag("item-not-found", { xmlns = xmlns_errors })
);
clear_old_session(session, id);
resumption_expired(1);
else
session.log("debug", "Tried to resume non-existent session with id %s", id);
session.send(st.stanza("failed", { xmlns = xmlns_sm })
:tag("item-not-found", { xmlns = xmlns_errors })
);
end;
else
if original_session.hibernating_watchdog then
original_session.log("debug", "Letting the watchdog go");
original_session.hibernating_watchdog:cancel();
original_session.hibernating_watchdog = nil;
elseif session.hibernating then
original_session.log("error", "Hibernating session has no watchdog!")
return nil, resume_errors.new("expired", { h = old_session.h });
end
-- zero age = was not hibernating yet
local age = 0;
if original_session.hibernating then
local now = os_time();
age = now - original_session.hibernating;
end
session.log("debug", "mod_smacks resuming existing session %s...", original_session.id);
local queue = original_session.outgoing_stanza_queue;
local h = tonumber(stanza.attr.h);
original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked())
local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
if not err and not queue:resumable() then
err = ack_errors.new("overflow");
end
if err then
session.send(st.stanza("failed",
{ xmlns = xmlns_sm; h = format_h(original_session.handled_stanza_count); previd = id }));
session.log("debug", "Resumption failed: %s", err);
return true;
end
-- Update original_session with the parameters (connection, etc.) from the new session
sessionmanager.update_session(original_session, session);
-- Inform client of successful resumption
original_session.send(st.stanza("resumed", { xmlns = xmlns_sm,
h = format_h(original_session.handled_stanza_count), previd = id }));
-- Ok, we need to re-send any stanzas that the client didn't see
-- ...they are what is now left in the outgoing stanza queue
-- We have to use the send of "session" because we don't want to add our resent stanzas
-- to the outgoing queue again
original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
for _, queued_stanza in queue:resume() do
original_session.send(queued_stanza);
end
session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked());
-- Add our own handlers to the resumed session (filters have been reset in the update)
wrap_session(original_session, true);
-- Let everyone know that we are no longer hibernating
module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
request_ack_now_if_needed(original_session, true, "handle_resume", nil);
resumption_age:sample(age);
session.log("debug", "Tried to resume non-existent session with id %s", id);
return nil, resume_errors.new("unknown_session");
end
if original_session.hibernating_watchdog then
original_session.log("debug", "Letting the watchdog go");
original_session.hibernating_watchdog:cancel();
original_session.hibernating_watchdog = nil;
elseif session.hibernating then
original_session.log("error", "Hibernating session has no watchdog!")
end
-- zero age = was not hibernating yet
local age = 0;
if original_session.hibernating then
local now = os_time();
age = now - original_session.hibernating;
end
session.log("debug", "mod_smacks resuming existing session %s...", original_session.id);
local queue = original_session.outgoing_stanza_queue;
local h = tonumber(stanza.attr.h);
original_session.log("debug", "Pre-resumption #queue = %d", queue:count_unacked())
local acked, err = ack_errors.coerce(queue:ack(h)); -- luacheck: ignore 211/acked
if not err and not queue:resumable() then
err = ack_errors.new("overflow");
end
if err then
session.log("debug", "Resumption failed: %s", err);
return nil, err;
end
-- Update original_session with the parameters (connection, etc.) from the new session
sessionmanager.update_session(original_session, session);
return {
session = original_session;
id = id;
-- Return function to complete the resumption and resync unacked stanzas
-- This is two steps so we can support SASL2/ISR
finish = function ()
-- Ok, we need to re-send any stanzas that the client didn't see
-- ...they are what is now left in the outgoing stanza queue
-- We have to use the send of "session" because we don't want to add our resent stanzas
-- to the outgoing queue again
original_session.log("debug", "resending all unacked stanzas that are still queued after resume, #queue = %d", queue:count_unacked());
for _, queued_stanza in queue:resume() do
original_session.send(queued_stanza);
end
original_session.log("debug", "all stanzas resent, enabling stream management on resumed stream, #queue = %d", queue:count_unacked());
-- Add our own handlers to the resumed session (filters have been reset in the update)
wrap_session(original_session, true);
-- Let everyone know that we are no longer hibernating
module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
request_ack_now_if_needed(original_session, true, "handle_resume", nil);
resumption_age:sample(age);
end;
};
end
function handle_resume(session, stanza, xmlns_sm)
local resumed, err = do_resume(session, stanza);
if not resumed then
session.send(st.stanza("failed", { xmlns = xmlns_sm, h = format_h(err.context.h) })
:tag(err.condition, { xmlns = xmlns_errors }));
return true;
end
session = resumed.session;
-- Inform client of successful resumption
session.send(st.stanza("resumed", { xmlns = xmlns_sm,
h = format_h(session.handled_stanza_count), previd = resumed.id }));
-- Complete resume (sync stanzas, etc.)
resumed.finish();
return true;
end