mirror of
https://github.com/bjc/prosody.git
synced 2025-04-05 06:07:37 +03:00
mod_smacks: Clean up compat code etc
Unstoppable stoppable timer compat not needed since 26f54b462601 ca 0.11.0 module:hook_stanza was renamed in 2012 No idea what was going on with the indentation and such
This commit is contained in:
parent
5e86776f0c
commit
ea20acace0
1 changed files with 39 additions and 54 deletions
|
@ -12,8 +12,7 @@
|
||||||
--
|
--
|
||||||
|
|
||||||
local st = require "util.stanza";
|
local st = require "util.stanza";
|
||||||
local dep = require "util.dependencies";
|
local cache = require "util.cache";
|
||||||
local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+
|
|
||||||
local uuid_generate = require "util.uuid".generate;
|
local uuid_generate = require "util.uuid".generate;
|
||||||
local jid = require "util.jid";
|
local jid = require "util.jid";
|
||||||
|
|
||||||
|
@ -104,20 +103,7 @@ local session_registry = init_session_cache(max_hibernated_sessions, function(re
|
||||||
return true; -- allow session to be removed from full cache to make room for new one
|
return true; -- allow session to be removed from full cache to make room for new one
|
||||||
end);
|
end);
|
||||||
|
|
||||||
local function stoppable_timer(delay, callback)
|
local function ack_delayed(session, stanza)
|
||||||
local stopped = false;
|
|
||||||
local timer = module:add_timer(delay, function (t)
|
|
||||||
if stopped then return; end
|
|
||||||
return callback(t);
|
|
||||||
end);
|
|
||||||
if timer and timer.stop then return timer; end -- new prosody api includes stop() function
|
|
||||||
return {
|
|
||||||
stop = function(self) stopped = true end;
|
|
||||||
timer;
|
|
||||||
};
|
|
||||||
end
|
|
||||||
|
|
||||||
local function delayed_ack_function(session, stanza)
|
|
||||||
-- fire event only if configured to do so and our session is not already hibernated or destroyed
|
-- fire event only if configured to do so and our session is not already hibernated or destroyed
|
||||||
if delayed_ack_timeout > 0 and session.awaiting_ack
|
if delayed_ack_timeout > 0 and session.awaiting_ack
|
||||||
and not session.hibernating and not session.destroyed then
|
and not session.hibernating and not session.destroyed then
|
||||||
|
@ -162,7 +148,6 @@ module:hook("s2s-stream-features",
|
||||||
local function request_ack_if_needed(session, force, reason, stanza)
|
local function request_ack_if_needed(session, force, reason, stanza)
|
||||||
local queue = session.outgoing_stanza_queue;
|
local queue = session.outgoing_stanza_queue;
|
||||||
local expected_h = session.last_acknowledged_stanza + #queue;
|
local expected_h = session.last_acknowledged_stanza + #queue;
|
||||||
-- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
|
|
||||||
local max_unacked = max_unacked_stanzas;
|
local max_unacked = max_unacked_stanzas;
|
||||||
if session.state == "inactive" then
|
if session.state == "inactive" then
|
||||||
max_unacked = max_inactive_unacked_stanzas;
|
max_unacked = max_inactive_unacked_stanzas;
|
||||||
|
@ -171,11 +156,10 @@ local function request_ack_if_needed(session, force, reason, stanza)
|
||||||
-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
|
-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
|
||||||
-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
|
-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
|
||||||
-- further requests until a higher h-value would be expected.
|
-- further requests until a higher h-value would be expected.
|
||||||
-- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h));
|
|
||||||
if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
|
if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
|
||||||
session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
|
session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
|
||||||
session.awaiting_ack = false;
|
session.awaiting_ack = false;
|
||||||
session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
|
session.awaiting_ack_timer = module:add_timer(1e-06, function ()
|
||||||
-- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
|
-- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
|
||||||
-- only request ack if needed and our session is not already hibernated or destroyed
|
-- only request ack if needed and our session is not already hibernated or destroyed
|
||||||
if not session.awaiting_ack and not session.hibernating and not session.destroyed then
|
if not session.awaiting_ack and not session.hibernating and not session.destroyed then
|
||||||
|
@ -187,8 +171,8 @@ local function request_ack_if_needed(session, force, reason, stanza)
|
||||||
session.last_requested_h = session.last_acknowledged_stanza + #queue;
|
session.last_requested_h = session.last_acknowledged_stanza + #queue;
|
||||||
session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
|
session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
|
||||||
if not session.delayed_ack_timer then
|
if not session.delayed_ack_timer then
|
||||||
session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
|
session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
|
||||||
delayed_ack_function(session, nil); -- we don't know if this is the only new stanza in the queue
|
ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
|
||||||
end);
|
end);
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -201,8 +185,8 @@ local function request_ack_if_needed(session, force, reason, stanza)
|
||||||
-- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
|
-- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
|
||||||
-- would not trigger this event (again).
|
-- would not trigger this event (again).
|
||||||
if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
|
if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
|
||||||
session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)");
|
session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
|
||||||
delayed_ack_function(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
|
ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -309,12 +293,12 @@ function handle_enable(session, stanza, xmlns_sm)
|
||||||
(session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) }));
|
(session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) }));
|
||||||
return true;
|
return true;
|
||||||
end
|
end
|
||||||
module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
|
module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
|
||||||
module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
|
module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
|
||||||
|
|
||||||
module:hook_stanza("http://etherx.jabber.org/streams", "features",
|
module:hook_tag("http://etherx.jabber.org/streams", "features",
|
||||||
function (session, stanza)
|
function (session, stanza)
|
||||||
stoppable_timer(1e-6, function ()
|
module:add_timer(1e-6, function ()
|
||||||
if can_do_smacks(session) then
|
if can_do_smacks(session) then
|
||||||
if stanza:get_child("sm", xmlns_sm3) then
|
if stanza:get_child("sm", xmlns_sm3) then
|
||||||
session.sends2s(st.stanza("enable", sm3_attr));
|
session.sends2s(st.stanza("enable", sm3_attr));
|
||||||
|
@ -330,7 +314,7 @@ module:hook_stanza("http://etherx.jabber.org/streams", "features",
|
||||||
end);
|
end);
|
||||||
end);
|
end);
|
||||||
|
|
||||||
function handle_enabled(session, stanza, xmlns_sm)
|
function handle_enabled(session, stanza, xmlns_sm) -- luacheck: ignore 212/stanza
|
||||||
module:log("debug", "Enabling stream management");
|
module:log("debug", "Enabling stream management");
|
||||||
session.smacks = xmlns_sm;
|
session.smacks = xmlns_sm;
|
||||||
|
|
||||||
|
@ -340,10 +324,10 @@ function handle_enabled(session, stanza, xmlns_sm)
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
end
|
end
|
||||||
module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
|
module:hook_tag(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
|
||||||
module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
|
module:hook_tag(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
|
||||||
|
|
||||||
function handle_r(origin, stanza, xmlns_sm)
|
function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza
|
||||||
if not origin.smacks then
|
if not origin.smacks then
|
||||||
module:log("debug", "Received ack request from non-smack-enabled session");
|
module:log("debug", "Received ack request from non-smack-enabled session");
|
||||||
return;
|
return;
|
||||||
|
@ -358,8 +342,8 @@ function handle_r(origin, stanza, xmlns_sm)
|
||||||
end
|
end
|
||||||
return true;
|
return true;
|
||||||
end
|
end
|
||||||
module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
|
module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
|
||||||
module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
|
module:hook_tag(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
|
||||||
|
|
||||||
function handle_a(origin, stanza)
|
function handle_a(origin, stanza)
|
||||||
if not origin.smacks then return; end
|
if not origin.smacks then return; end
|
||||||
|
@ -391,7 +375,7 @@ function handle_a(origin, stanza)
|
||||||
return;
|
return;
|
||||||
end
|
end
|
||||||
|
|
||||||
for i=1,math_min(handled_stanza_count,#queue) do
|
for _=1,math_min(handled_stanza_count,#queue) do
|
||||||
local handled_stanza = t_remove(origin.outgoing_stanza_queue, 1);
|
local handled_stanza = t_remove(origin.outgoing_stanza_queue, 1);
|
||||||
module:fire_event("delivery/success", { session = origin, stanza = handled_stanza });
|
module:fire_event("delivery/success", { session = origin, stanza = handled_stanza });
|
||||||
end
|
end
|
||||||
|
@ -401,8 +385,8 @@ function handle_a(origin, stanza)
|
||||||
request_ack_if_needed(origin, false, "handle_a", nil)
|
request_ack_if_needed(origin, false, "handle_a", nil)
|
||||||
return true;
|
return true;
|
||||||
end
|
end
|
||||||
module:hook_stanza(xmlns_sm2, "a", handle_a);
|
module:hook_tag(xmlns_sm2, "a", handle_a);
|
||||||
module:hook_stanza(xmlns_sm3, "a", handle_a);
|
module:hook_tag(xmlns_sm3, "a", handle_a);
|
||||||
|
|
||||||
--TODO: Optimise... incoming stanzas should be handled by a per-session
|
--TODO: Optimise... incoming stanzas should be handled by a per-session
|
||||||
-- function that has a counter as an upvalue (no table indexing for increments,
|
-- function that has a counter as an upvalue (no table indexing for increments,
|
||||||
|
@ -472,7 +456,7 @@ module:hook("delivery/failure", function(event)
|
||||||
end);
|
end);
|
||||||
|
|
||||||
module:hook("pre-resource-unbind", function (event)
|
module:hook("pre-resource-unbind", function (event)
|
||||||
local session, err = event.session, event.error;
|
local session = event.session;
|
||||||
if session.smacks then
|
if session.smacks then
|
||||||
if not session.resumption_token then
|
if not session.resumption_token then
|
||||||
local queue = session.outgoing_stanza_queue;
|
local queue = session.outgoing_stanza_queue;
|
||||||
|
@ -492,7 +476,7 @@ module:hook("pre-resource-unbind", function (event)
|
||||||
-- matches the smacks session this timer is for in case it changed
|
-- matches the smacks session this timer is for in case it changed
|
||||||
-- (for example, the client may have bound a new resource and
|
-- (for example, the client may have bound a new resource and
|
||||||
-- started a new smacks session, or not be using smacks)
|
-- started a new smacks session, or not be using smacks)
|
||||||
local curr_session = full_sessions[session.full_jid];
|
local curr_session = prosody.full_sessions[session.full_jid];
|
||||||
if session.destroyed then
|
if session.destroyed then
|
||||||
session.log("debug", "The session has already been destroyed");
|
session.log("debug", "The session has already been destroyed");
|
||||||
elseif curr_session and curr_session.resumption_token == resumption_token
|
elseif curr_session and curr_session.resumption_token == resumption_token
|
||||||
|
@ -509,7 +493,8 @@ module:hook("pre-resource-unbind", function (event)
|
||||||
return resume_timeout;
|
return resume_timeout;
|
||||||
end
|
end
|
||||||
if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then
|
if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then
|
||||||
session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout-(current_time-timeout_start));
|
session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds",
|
||||||
|
resume_timeout - (current_time - timeout_start));
|
||||||
return resume_timeout-(current_time-timeout_start); -- time left to wait
|
return resume_timeout-(current_time-timeout_start); -- time left to wait
|
||||||
end
|
end
|
||||||
session.log("debug", "Destroying session for hibernating too long");
|
session.log("debug", "Destroying session for hibernating too long");
|
||||||
|
@ -626,7 +611,7 @@ function handle_resume(session, stanza, xmlns_sm)
|
||||||
session.send(queue[i]);
|
session.send(queue[i]);
|
||||||
end
|
end
|
||||||
session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue);
|
session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue);
|
||||||
function session.send(stanza)
|
function session.send(stanza) -- luacheck: ignore 432
|
||||||
migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
|
migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
|
||||||
return false;
|
return false;
|
||||||
end
|
end
|
||||||
|
@ -641,8 +626,8 @@ function handle_resume(session, stanza, xmlns_sm)
|
||||||
end
|
end
|
||||||
return true;
|
return true;
|
||||||
end
|
end
|
||||||
module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
|
module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
|
||||||
module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
|
module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
|
||||||
|
|
||||||
module:hook("csi-client-active", function (event)
|
module:hook("csi-client-active", function (event)
|
||||||
if event.origin.smacks then
|
if event.origin.smacks then
|
||||||
|
@ -678,8 +663,8 @@ local function handle_read_timeout(event)
|
||||||
(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
|
(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
|
||||||
session.awaiting_ack = true;
|
session.awaiting_ack = true;
|
||||||
if not session.delayed_ack_timer then
|
if not session.delayed_ack_timer then
|
||||||
session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
|
session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
|
||||||
delayed_ack_function(session, nil);
|
ack_delayed(session, nil);
|
||||||
end);
|
end);
|
||||||
end
|
end
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue