mirror of
https://github.com/bjc/prosody.git
synced 2025-04-03 21:27:38 +03:00
mod_csi_simple: Remove old "pump" queue/buffer method, handled in net.server now
This commit is contained in:
parent
e8f72c6d4f
commit
6c89a86e0d
1 changed files with 2 additions and 61 deletions
|
@ -9,42 +9,8 @@ module:depends"csi"
|
|||
local jid = require "util.jid";
|
||||
local st = require "util.stanza";
|
||||
local dt = require "util.datetime";
|
||||
local new_queue = require "util.queue".new;
|
||||
local filters = require "util.filters";
|
||||
|
||||
local function new_pump(output, ...)
|
||||
-- luacheck: ignore 212/self
|
||||
local q = new_queue(...);
|
||||
local flush = true;
|
||||
function q:pause()
|
||||
flush = false;
|
||||
end
|
||||
function q:resume()
|
||||
flush = true;
|
||||
return q:flush();
|
||||
end
|
||||
local push = q.push;
|
||||
function q:push(item)
|
||||
local ok = push(self, item);
|
||||
if not ok then
|
||||
q:flush();
|
||||
output(item, self);
|
||||
elseif flush then
|
||||
return q:flush();
|
||||
end
|
||||
return true;
|
||||
end
|
||||
function q:flush()
|
||||
local item = self:pop();
|
||||
while item do
|
||||
output(item, self);
|
||||
item = self:pop();
|
||||
end
|
||||
return true;
|
||||
end
|
||||
return q;
|
||||
end
|
||||
|
||||
local queue_size = module:get_option_number("csi_queue_size", 256);
|
||||
|
||||
module:hook("csi-is-stanza-important", function (event)
|
||||
|
@ -109,45 +75,20 @@ local function flush_buffer(data, session)
|
|||
return data;
|
||||
end
|
||||
|
||||
local function flush_pump(data, session)
|
||||
session.pump:flush();
|
||||
return data;
|
||||
end
|
||||
|
||||
module:hook("csi-client-inactive", function (event)
|
||||
local session = event.origin;
|
||||
if session.conn and session.conn and session.conn.pause_writes then
|
||||
session.log("info", "Native net.server buffer management mode");
|
||||
session.conn:pause_writes();
|
||||
filters.add_filter(session, "stanzas/out", manage_buffer);
|
||||
filters.add_filter(session, "bytes/in", flush_buffer);
|
||||
elseif session.pump then
|
||||
session.pump:pause();
|
||||
else
|
||||
local bare_jid = jid.join(session.username, session.host);
|
||||
local send = session.send;
|
||||
session._orig_send = send;
|
||||
local pump = new_pump(session.send, queue_size);
|
||||
pump:pause();
|
||||
session.pump = pump;
|
||||
filters.add_filter(session, "bytes/in", flush_pump);
|
||||
function session.send(stanza)
|
||||
if session.state == "active" or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
|
||||
pump:flush();
|
||||
send(stanza);
|
||||
else
|
||||
pump:push(with_timestamp(stanza, bare_jid));
|
||||
end
|
||||
return true;
|
||||
end
|
||||
session.log("warn", "Session connection does not support write pausing");
|
||||
end
|
||||
end);
|
||||
|
||||
module:hook("csi-client-active", function (event)
|
||||
local session = event.origin;
|
||||
if session.pump then
|
||||
session.pump:resume();
|
||||
elseif session.conn and session.conn and session.conn.resume_writes then
|
||||
if session.conn and session.conn and session.conn.resume_writes then
|
||||
filters.remove_filter(session, "stanzas/out", manage_buffer);
|
||||
filters.remove_filter(session, "bytes/in", flush_buffer);
|
||||
session.conn:resume_writes();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue