mirror of
https://github.com/bjc/prosody.git
synced 2025-04-01 20:27:39 +03:00
153 lines
4.5 KiB
Lua
153 lines
4.5 KiB
Lua
-- Because we deal with pre-authed sessions and streams we can't be host-specific
|
|
module:set_global();
|
|
|
|
local filters = require "prosody.util.filters";
|
|
local throttle = require "prosody.util.throttle";
|
|
local timer = require "prosody.util.timer";
|
|
local ceil = math.ceil;
|
|
|
|
local limits_cfg = module:get_option("limits", {});
|
|
local limits_resolution = module:get_option_period("limits_resolution", 1);
|
|
|
|
local default_bytes_per_second = 3000;
|
|
local default_burst = 2;
|
|
|
|
local rate_units = { b = 1, k = 3, m = 6, g = 9, t = 12 } -- Plan for the future.
|
|
local function parse_rate(rate, sess_type)
|
|
local quantity, unit, exp;
|
|
if rate then
|
|
quantity, unit = rate:match("^(%d+) ?([^/]+)/s$");
|
|
exp = quantity and rate_units[unit:sub(1,1):lower()];
|
|
end
|
|
if not exp then
|
|
module:log("error", "Error parsing rate for %s: %q, using default rate (%d bytes/s)", sess_type, rate, default_bytes_per_second);
|
|
return default_bytes_per_second;
|
|
end
|
|
return quantity*(10^exp);
|
|
end
|
|
|
|
local function parse_burst(burst, sess_type)
|
|
if type(burst) == "string" then
|
|
burst = burst:match("^(%d+) ?s$");
|
|
end
|
|
local n_burst = tonumber(burst);
|
|
if burst and not n_burst then
|
|
module:log("error", "Unable to parse burst for %s: %q, using default burst interval (%ds)", sess_type, burst, default_burst);
|
|
end
|
|
return n_burst or default_burst;
|
|
end
|
|
|
|
-- Process config option into limits table:
|
|
-- limits = { c2s = { bytes_per_second = X, burst_seconds = Y } }
|
|
local limits = {
|
|
c2s = {
|
|
bytes_per_second = 10 * 1024;
|
|
burst_seconds = 2;
|
|
};
|
|
s2sin = {
|
|
bytes_per_second = 30 * 1024;
|
|
burst_seconds = 2;
|
|
};
|
|
};
|
|
|
|
for sess_type, sess_limits in pairs(limits_cfg) do
|
|
limits[sess_type] = {
|
|
bytes_per_second = parse_rate(sess_limits.rate, sess_type);
|
|
burst_seconds = parse_burst(sess_limits.burst, sess_type);
|
|
};
|
|
end
|
|
|
|
local default_filter_set = {};
|
|
|
|
function default_filter_set.bytes_in(bytes, session)
|
|
local sess_throttle = session.throttle;
|
|
if sess_throttle then
|
|
local ok, _, outstanding = sess_throttle:poll(#bytes, true);
|
|
if not ok then
|
|
session.log("debug", "Session over rate limit (%d) with %d (by %d), pausing", sess_throttle.max, #bytes, outstanding);
|
|
outstanding = ceil(outstanding);
|
|
session.conn:pause(); -- Read no more data from the connection until there is no outstanding data
|
|
local outstanding_data = bytes:sub(-outstanding);
|
|
bytes = bytes:sub(1, #bytes-outstanding);
|
|
timer.add_task(limits_resolution, function ()
|
|
if not session.conn then return; end
|
|
if sess_throttle:peek(#outstanding_data) then
|
|
session.log("debug", "Resuming paused session");
|
|
session.conn:resume();
|
|
end
|
|
-- Handle what we can of the outstanding data
|
|
session.data(outstanding_data);
|
|
end);
|
|
end
|
|
end
|
|
return bytes;
|
|
end
|
|
|
|
local type_filters = {
|
|
c2s = default_filter_set;
|
|
s2sin = default_filter_set;
|
|
s2sout = default_filter_set;
|
|
};
|
|
|
|
local function filter_hook(session)
|
|
local session_type = session.type:match("^[^_]+");
|
|
local filter_set, opts = type_filters[session_type], limits[session_type];
|
|
if opts then
|
|
if session.conn and session.conn.setlimit then
|
|
session.conn:setlimit(opts.bytes_per_second);
|
|
-- Currently no burst support
|
|
else
|
|
session.throttle = throttle.create(opts.bytes_per_second * opts.burst_seconds, opts.burst_seconds);
|
|
filters.add_filter(session, "bytes/in", filter_set.bytes_in, 1000);
|
|
end
|
|
end
|
|
end
|
|
|
|
function module.load()
|
|
filters.add_filter_hook(filter_hook);
|
|
end
|
|
|
|
function module.unload()
|
|
filters.remove_filter_hook(filter_hook);
|
|
end
|
|
|
|
function unlimited(session)
|
|
local session_type = session.type:match("^[^_]+");
|
|
if session.conn and session.conn.setlimit then
|
|
session.conn:setlimit(0);
|
|
-- Currently no burst support
|
|
else
|
|
local filter_set = type_filters[session_type];
|
|
filters.remove_filter(session, "bytes/in", filter_set.bytes_in);
|
|
session.throttle = nil;
|
|
end
|
|
end
|
|
|
|
function module.add_host(module)
|
|
local unlimited_jids = module:get_option_inherited_set("unlimited_jids", {});
|
|
|
|
if not unlimited_jids:empty() then
|
|
module:hook("authentication-success", function (event)
|
|
local session = event.session;
|
|
local jid = session.username .. "@" .. session.host;
|
|
if unlimited_jids:contains(jid) then
|
|
unlimited(session);
|
|
end
|
|
end);
|
|
|
|
module:hook("s2sout-established", function (event)
|
|
local session = event.session;
|
|
if unlimited_jids:contains(session.to_host) then
|
|
unlimited(session);
|
|
end
|
|
end);
|
|
|
|
module:hook("s2sin-established", function (event)
|
|
local session = event.session;
|
|
if session.from_host and unlimited_jids:contains(session.from_host) then
|
|
unlimited(session);
|
|
end
|
|
end);
|
|
|
|
end
|
|
end
|