mirror of
https://github.com/bjc/prosody.git
synced 2025-04-01 20:27:39 +03:00
Many of these fall into a few categories: - util.cache size, must be >= 1 - byte or item counts that logically can't be negative - port numbers that should be in 1..0xffff
288 lines
9.4 KiB
Lua
288 lines
9.4 KiB
Lua
-- Copyright (C) 2016-2020 Kim Alvefur
|
|
--
|
|
-- This project is MIT/X11 licensed. Please see the
|
|
-- COPYING file in the source package for more information.
|
|
--
|
|
|
|
module:depends"csi"
|
|
|
|
local jid = require "prosody.util.jid";
|
|
local st = require "prosody.util.stanza";
|
|
local dt = require "prosody.util.datetime";
|
|
local filters = require "prosody.util.filters";
|
|
local timer = require "prosody.util.timer";
|
|
|
|
local queue_size = module:get_option_integer("csi_queue_size", 256, 1);
|
|
local resume_delay = module:get_option_period("csi_resume_inactive_delay", 5);
|
|
|
|
local important_payloads = module:get_option_set("csi_important_payloads", { });
|
|
|
|
function is_important(stanza) --> boolean, reason: string
|
|
if stanza == " " then
|
|
return true, "whitespace keepalive";
|
|
elseif type(stanza) == "string" then
|
|
return true, "raw data";
|
|
elseif not st.is_stanza(stanza) then
|
|
-- This should probably never happen
|
|
return true, type(stanza);
|
|
end
|
|
if stanza.attr.xmlns ~= nil then
|
|
-- stream errors, stream management etc
|
|
return true, "nonza";
|
|
end
|
|
local st_name = stanza.name;
|
|
if not st_name then return false; end
|
|
local st_type = stanza.attr.type;
|
|
if st_name == "presence" then
|
|
if st_type == nil or st_type == "unavailable" or st_type == "error" then
|
|
return false, "presence update";
|
|
end
|
|
-- TODO Some MUC awareness, e.g. check for the 'this relates to you' status code
|
|
return true, "subscription request";
|
|
elseif st_name == "message" then
|
|
if st_type == "headline" then
|
|
-- Headline messages are ephemeral by definition
|
|
return false, "headline";
|
|
end
|
|
if st_type == "error" then
|
|
return true, "delivery failure";
|
|
end
|
|
if stanza:get_child("sent", "urn:xmpp:carbons:2") then
|
|
return true, "carbon";
|
|
end
|
|
local forwarded = stanza:find("{urn:xmpp:carbons:2}received/{urn:xmpp:forward:0}/{jabber:client}message");
|
|
if forwarded then
|
|
stanza = forwarded;
|
|
end
|
|
if stanza:get_child("body") then
|
|
return true, "body";
|
|
end
|
|
if stanza:get_child("subject") then
|
|
-- Last step of a MUC join
|
|
return true, "subject";
|
|
end
|
|
if stanza:get_child("encryption", "urn:xmpp:eme:0") then
|
|
-- Since we can't know what an encrypted message contains, we assume it's important
|
|
-- XXX Experimental XEP
|
|
return true, "encrypted";
|
|
end
|
|
if stanza:get_child("x", "jabber:x:conference") or stanza:find("{http://jabber.org/protocol/muc#user}x/invite") then
|
|
return true, "invite";
|
|
end
|
|
if stanza:get_child(nil, "urn:xmpp:jingle-message:0") or stanza:get_child(nil, "urn:xmpp:jingle-message:1") then
|
|
-- XXX Experimental XEP
|
|
return true, "jingle call";
|
|
end
|
|
for important in important_payloads do
|
|
if stanza:find(important) then
|
|
return true;
|
|
end
|
|
end
|
|
return false;
|
|
elseif st_name == "iq" then
|
|
return true;
|
|
end
|
|
end
|
|
|
|
module:hook("csi-is-stanza-important", function (event)
|
|
local important, why = is_important(event.stanza);
|
|
event.reason = why;
|
|
return important;
|
|
end, -1);
|
|
|
|
local function should_flush(stanza, session, ctr) --> boolean, reason: string
|
|
if ctr >= queue_size then
|
|
return true, "queue size limit reached";
|
|
end
|
|
local event = { stanza = stanza, session = session };
|
|
local ret = module:fire_event("csi-is-stanza-important", event)
|
|
return ret, event.reason;
|
|
end
|
|
|
|
local function with_timestamp(stanza, from)
|
|
if st.is_stanza(stanza) and stanza.attr.xmlns == nil and stanza.name ~= "iq" then
|
|
stanza = st.clone(stanza);
|
|
stanza:add_direct_child(st.stanza("delay", {xmlns = "urn:xmpp:delay", from = from, stamp = dt.datetime()}));
|
|
end
|
|
return stanza;
|
|
end
|
|
|
|
local measure_buffer_hold = module:measure("buffer_hold", "times",
|
|
{ buckets = { 0.1; 1; 5; 10; 15; 30; 60; 120; 180; 300; 600; 900 } });
|
|
|
|
local flush_reasons = module:metric(
|
|
"counter", "flushes", "",
|
|
"CSI queue flushes",
|
|
{ "reason" }
|
|
);
|
|
|
|
local flush_sizes = module:metric("histogram", "flush_stanza_count", "", "Number of stanzas flushed at once", {},
|
|
{ buckets = { 0, 1, 2, 4, 8, 16, 32, 64, 128, 256 } }):with_labels();
|
|
|
|
local function manage_buffer(stanza, session)
|
|
local ctr = session.csi_counter or 0;
|
|
if session.state ~= "inactive" then
|
|
session.csi_counter = ctr + 1;
|
|
return stanza;
|
|
end
|
|
local flush, why = should_flush(stanza, session, ctr);
|
|
if flush then
|
|
if session.csi_measure_buffer_hold then
|
|
session.csi_measure_buffer_hold();
|
|
session.csi_measure_buffer_hold = nil;
|
|
end
|
|
flush_reasons:with_labels(why or "important"):add(1);
|
|
flush_sizes:sample(ctr);
|
|
session.log("debug", "Flushing buffer (%s; queue size is %d)", why or "important", session.csi_counter);
|
|
session.state = "flushing";
|
|
module:fire_event("csi-flushing", { session = session });
|
|
session.conn:resume_writes();
|
|
else
|
|
session.log("debug", "Holding buffer (%s; queue size is %d)", why or "unimportant", session.csi_counter);
|
|
stanza = with_timestamp(stanza, jid.join(session.username, session.host))
|
|
end
|
|
session.csi_counter = ctr + 1;
|
|
return stanza;
|
|
end
|
|
|
|
local function flush_buffer(data, session)
|
|
local ctr = session.csi_counter or 0;
|
|
if ctr == 0 or session.state ~= "inactive" then return data end
|
|
session.log("debug", "Flushing buffer (%s; queue size is %d)", "client activity", session.csi_counter);
|
|
session.state = "flushing";
|
|
module:fire_event("csi-flushing", { session = session });
|
|
flush_sizes:sample(ctr);
|
|
flush_reasons:with_labels("client activity"):add(1);
|
|
if session.csi_measure_buffer_hold then
|
|
session.csi_measure_buffer_hold();
|
|
session.csi_measure_buffer_hold = nil;
|
|
end
|
|
session.conn:resume_writes();
|
|
return data;
|
|
end
|
|
|
|
function enable_optimizations(session)
|
|
if session.conn and session.conn.pause_writes then
|
|
session.conn:pause_writes();
|
|
session.csi_measure_buffer_hold = measure_buffer_hold();
|
|
session.csi_counter = 0;
|
|
if session.csi_resume then
|
|
timer.stop(session.csi_resume);
|
|
session.csi_resume = nil;
|
|
end
|
|
filters.add_filter(session, "stanzas/out", manage_buffer);
|
|
filters.add_filter(session, "bytes/in", flush_buffer);
|
|
else
|
|
session.log("warn", "Session connection does not support write pausing");
|
|
end
|
|
end
|
|
|
|
function disable_optimizations(session)
|
|
filters.remove_filter(session, "stanzas/out", manage_buffer);
|
|
filters.remove_filter(session, "bytes/in", flush_buffer);
|
|
session.csi_counter = nil;
|
|
if session.csi_resume then
|
|
timer.stop(session.csi_resume);
|
|
session.csi_resume = nil;
|
|
end
|
|
if session.csi_measure_buffer_hold then
|
|
session.csi_measure_buffer_hold();
|
|
session.csi_measure_buffer_hold = nil;
|
|
end
|
|
if session.conn and session.conn.resume_writes then
|
|
session.conn:resume_writes();
|
|
end
|
|
end
|
|
|
|
module:hook("csi-client-inactive", function (event)
|
|
local session = event.origin;
|
|
enable_optimizations(session);
|
|
end);
|
|
|
|
module:hook("csi-client-active", function (event)
|
|
local session = event.origin;
|
|
disable_optimizations(session);
|
|
end);
|
|
|
|
module:hook("pre-resource-unbind", function (event)
|
|
local session = event.session;
|
|
disable_optimizations(session);
|
|
end, 1);
|
|
|
|
local function resume_optimizations(_, _, session)
|
|
if (session.state == "flushing" or session.state == "inactive") and session.conn and session.conn.pause_writes then
|
|
session.state = "inactive";
|
|
session.conn:pause_writes();
|
|
session.csi_measure_buffer_hold = measure_buffer_hold();
|
|
session.log("debug", "Buffer flushed, resuming inactive mode (queue size was %d)", session.csi_counter);
|
|
session.csi_counter = 0;
|
|
end
|
|
session.csi_resume = nil;
|
|
end
|
|
|
|
module:hook("c2s-ondrain", function (event)
|
|
local session = event.session;
|
|
if (session.state == "flushing" or session.state == "inactive") and session.conn and session.conn.pause_writes then
|
|
-- After flushing, remain in pseudo-flushing state for a moment to allow
|
|
-- some followup traffic, iq replies, smacks acks to be sent without having
|
|
-- to go back and forth between inactive and flush mode.
|
|
if not session.csi_resume then
|
|
session.csi_resume = timer.add_task(resume_delay, resume_optimizations, session);
|
|
end
|
|
-- Should further noise in this short grace period push back the delay?
|
|
-- Probably not great if the session can be kept in pseudo-active mode
|
|
-- indefinitely.
|
|
end
|
|
end);
|
|
|
|
function module.load()
|
|
for _, user_session in pairs(prosody.hosts[module.host].sessions) do
|
|
for _, session in pairs(user_session.sessions) do
|
|
if session.state == "inactive" then
|
|
enable_optimizations(session);
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
function module.unload()
|
|
for _, user_session in pairs(prosody.hosts[module.host].sessions) do
|
|
for _, session in pairs(user_session.sessions) do
|
|
if session.state and session.state ~= "active" then
|
|
disable_optimizations(session);
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
function module.command(arg)
|
|
if arg[1] ~= "test" then
|
|
print("Usage: "..module.name.." test < test-stream.xml")
|
|
print("");
|
|
print("Provide a series of stanzas to test against importance algorithm");
|
|
return 1;
|
|
end
|
|
-- luacheck: ignore 212/self
|
|
local xmppstream = require "prosody.util.xmppstream";
|
|
local input_session = { notopen = true }
|
|
local stream_callbacks = { stream_ns = "jabber:client", default_ns = "jabber:client" };
|
|
function stream_callbacks:handlestanza(stanza)
|
|
local important, because = is_important(stanza);
|
|
print("--");
|
|
print(stanza:indent(nil, " "));
|
|
-- :pretty_print() maybe?
|
|
if important then
|
|
print((because or "unspecified reason").. " -> important");
|
|
else
|
|
print((because or "unspecified reason").. " -> unimportant");
|
|
end
|
|
end
|
|
local input_stream = xmppstream.new(input_session, stream_callbacks);
|
|
input_stream:reset();
|
|
input_stream:feed(st.stanza("stream", { xmlns = "jabber:client" }):top_tag());
|
|
input_session.notopen = nil;
|
|
|
|
for line in io.lines() do
|
|
input_stream:feed(line);
|
|
end
|
|
end
|