mirror of
https://github.com/bjc/prosody.git
synced 2025-04-05 14:17:37 +03:00
mod_smacks: sprinkle some metrics on it
This commit is contained in:
parent
079a39c216
commit
f53f452e47
1 changed files with 45 additions and 2 deletions
|
@ -15,6 +15,35 @@ local tonumber = tonumber;
|
||||||
local tostring = tostring;
|
local tostring = tostring;
|
||||||
local os_time = os.time;
|
local os_time = os.time;
|
||||||
|
|
||||||
|
-- These metrics together allow to calculate an instantaneous
|
||||||
|
-- "unacked stanzas" metric in the graphing frontend, without us having to
|
||||||
|
-- iterate over all the queues.
|
||||||
|
local tx_queued_stanzas = module:measure("tx_queued_stanzas", "counter");
|
||||||
|
local tx_dropped_stanzas = module:metric(
|
||||||
|
"histogram",
|
||||||
|
"tx_dropped_stanzas", "", "number of stanzas in a queue which got dropped",
|
||||||
|
{},
|
||||||
|
{buckets = {0, 1, 2, 4, 8, 16, 32}}
|
||||||
|
):with_labels();
|
||||||
|
local tx_acked_stanzas = module:metric(
|
||||||
|
"histogram",
|
||||||
|
"tx_acked_stanzas", "", "number of items acked per ack received",
|
||||||
|
{},
|
||||||
|
{buckets = {0, 1, 2, 4, 8, 16, 32}}
|
||||||
|
):with_labels();
|
||||||
|
|
||||||
|
-- number of session resumptions attempts where the session had expired
|
||||||
|
local resumption_expired = module:measure("session_resumption_expired", "counter");
|
||||||
|
local resumption_age = module:metric(
|
||||||
|
"histogram",
|
||||||
|
"resumption_age", "seconds", "time the session had been hibernating at the time of a resumption",
|
||||||
|
{},
|
||||||
|
{buckets = { 0, 1, 2, 5, 10, 20, 50, 100, 200, 500 }}
|
||||||
|
):with_labels();
|
||||||
|
local sessions_expired = module:measure("sessions_expired", "counter");
|
||||||
|
local sessions_started = module:measure("sessions_started", "counter");
|
||||||
|
|
||||||
|
|
||||||
local datetime = require "util.datetime";
|
local datetime = require "util.datetime";
|
||||||
local add_filter = require "util.filters".add_filter;
|
local add_filter = require "util.filters".add_filter;
|
||||||
local jid = require "util.jid";
|
local jid = require "util.jid";
|
||||||
|
@ -168,6 +197,7 @@ local function outgoing_stanza_filter(stanza, session)
|
||||||
end
|
end
|
||||||
|
|
||||||
queue:push(cached_stanza);
|
queue:push(cached_stanza);
|
||||||
|
tx_queued_stanzas(1);
|
||||||
|
|
||||||
if session.hibernating then
|
if session.hibernating then
|
||||||
session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating));
|
session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating));
|
||||||
|
@ -229,6 +259,7 @@ end);
|
||||||
|
|
||||||
local function wrap_session_in(session, resume)
|
local function wrap_session_in(session, resume)
|
||||||
if not resume then
|
if not resume then
|
||||||
|
sessions_started(1);
|
||||||
session.handled_stanza_count = 0;
|
session.handled_stanza_count = 0;
|
||||||
end
|
end
|
||||||
add_filter(session, "stanzas/in", count_incoming_stanzas, 999);
|
add_filter(session, "stanzas/in", count_incoming_stanzas, 999);
|
||||||
|
@ -349,8 +380,9 @@ function handle_a(origin, stanza)
|
||||||
origin:close(err);
|
origin:close(err);
|
||||||
return;
|
return;
|
||||||
end
|
end
|
||||||
|
tx_acked_stanzas:sample(handled_stanza_count);
|
||||||
|
|
||||||
origin.log("debug", "#queue = %d", queue:count_unacked());
|
origin.log("debug", "#queue = %d (acked: %d)", queue:count_unacked(), handled_stanza_count);
|
||||||
request_ack_now_if_needed(origin, false, "handle_a", nil)
|
request_ack_now_if_needed(origin, false, "handle_a", nil)
|
||||||
return true;
|
return true;
|
||||||
end
|
end
|
||||||
|
@ -359,7 +391,9 @@ module:hook_tag(xmlns_sm3, "a", handle_a);
|
||||||
|
|
||||||
local function handle_unacked_stanzas(session)
|
local function handle_unacked_stanzas(session)
|
||||||
local queue = session.outgoing_stanza_queue;
|
local queue = session.outgoing_stanza_queue;
|
||||||
if queue:count_unacked() > 0 then
|
local unacked = queue:count_unacked()
|
||||||
|
if unacked > 0 then
|
||||||
|
tx_dropped_stanzas:sample(unacked);
|
||||||
session.smacks = false; -- Disable queueing
|
session.smacks = false; -- Disable queueing
|
||||||
session.outgoing_stanza_queue = nil;
|
session.outgoing_stanza_queue = nil;
|
||||||
for stanza in queue._queue:consume() do
|
for stanza in queue._queue:consume() do
|
||||||
|
@ -437,6 +471,7 @@ module:hook("pre-resource-unbind", function (event)
|
||||||
session.resumption_token = nil;
|
session.resumption_token = nil;
|
||||||
session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
|
session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
|
||||||
sessionmanager.destroy_session(session, "Hibernating too long");
|
sessionmanager.destroy_session(session, "Hibernating too long");
|
||||||
|
sessions_expired(1);
|
||||||
end);
|
end);
|
||||||
if session.conn then
|
if session.conn then
|
||||||
local conn = session.conn;
|
local conn = session.conn;
|
||||||
|
@ -490,6 +525,7 @@ function handle_resume(session, stanza, xmlns_sm)
|
||||||
:tag("item-not-found", { xmlns = xmlns_errors })
|
:tag("item-not-found", { xmlns = xmlns_errors })
|
||||||
);
|
);
|
||||||
old_session_registry:set(session.username, id, nil);
|
old_session_registry:set(session.username, id, nil);
|
||||||
|
resumption_expired(1);
|
||||||
else
|
else
|
||||||
session.log("debug", "Tried to resume non-existent session with id %s", id);
|
session.log("debug", "Tried to resume non-existent session with id %s", id);
|
||||||
session.send(st.stanza("failed", { xmlns = xmlns_sm })
|
session.send(st.stanza("failed", { xmlns = xmlns_sm })
|
||||||
|
@ -504,6 +540,12 @@ function handle_resume(session, stanza, xmlns_sm)
|
||||||
elseif session.hibernating then
|
elseif session.hibernating then
|
||||||
original_session.log("error", "Hibernating session has no watchdog!")
|
original_session.log("error", "Hibernating session has no watchdog!")
|
||||||
end
|
end
|
||||||
|
-- zero age = was not hibernating yet
|
||||||
|
local age = 0;
|
||||||
|
if original_session.hibernating then
|
||||||
|
local now = os_time();
|
||||||
|
age = now - original_session.hibernating;
|
||||||
|
end
|
||||||
session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
|
session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
|
||||||
original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
|
original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
|
||||||
-- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
|
-- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
|
||||||
|
@ -581,6 +623,7 @@ function handle_resume(session, stanza, xmlns_sm)
|
||||||
module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
|
module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
|
||||||
original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
|
original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
|
||||||
request_ack_now_if_needed(original_session, true, "handle_resume", nil);
|
request_ack_now_if_needed(original_session, true, "handle_resume", nil);
|
||||||
|
resumption_age:sample(age);
|
||||||
end
|
end
|
||||||
return true;
|
return true;
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue