mod_c2s: Add session:sleep() and session:wake() to pause a session (e.g. while waiting for an external event). Needs a gallon or two of testing.

This commit is contained in:
Matthew Wild 2013-08-09 11:10:22 +01:00
parent 96466999c1
commit 5383602429

View file

@ -18,6 +18,8 @@ local uuid_generate = require "util.uuid".generate;
local xpcall, tostring, type = xpcall, tostring, type;
local traceback = debug.traceback;
local t_insert, t_remove = table.insert, table.remove;
local co_running, co_resume = coroutine.running, coroutine.resume;
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
@ -31,7 +33,7 @@ local sessions = module:shared("sessions");
local core_process_stanza = prosody.core_process_stanza;
local hosts = prosody.hosts;
local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
local stream_callbacks = { default_ns = "jabber:client" };
local listener = {};
--- Stream events handlers
@ -120,9 +122,7 @@ end
local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
function stream_callbacks.handlestanza(session, stanza)
stanza = session.filter("stanzas/in", stanza);
if stanza then
return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
end
t_insert(session.pending_stanzas, stanza);
end
--- Session methods
@ -224,17 +224,64 @@ function listener.onconnect(conn)
session.stream:reset();
end
session.thread = coroutine.create(function (stanza)
while true do
core_process_stanza(session, stanza);
stanza = coroutine.yield("ready");
end
end);
session.pending_stanzas = {};
local filter = session.filter;
function session.data(data)
-- Parse the data, which will store stanzas in session.pending_stanzas
if data then
data = filter("bytes/in", data);
if data then
local ok, err = stream:feed(data);
if ok then return; end
if not ok then
log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
session:close("not-well-formed");
end
end
end
if co_running() ~= session.thread and not session.paused then
if session.state == "wait" then
session.state = "ready";
local ok, state = co_resume(session.thread);
if not ok then
log("error", "Traceback[c2s]: %s", state);
elseif state == "wait" then
return;
end
end
-- We're not currently running, so start the thread to process pending stanzas
local s, thread = session.pending_stanzas, session.thread;
local n = #s;
while n > 0 and session.state ~= "wait" do
session.log("debug", "processing %d stanzas", n);
local consumed;
for i = 1,n do
local stanza = s[i];
local ok, state = co_resume(thread, stanza);
if not ok then
log("error", "Traceback[c2s]: %s", state);
elseif state == "wait" then
consumed = i;
session.state = "wait";
break;
end
end
if not consumed then consumed = n; end
for i = 1, #s do
s[i] = s[consumed+i];
end
n = #s;
end
end
end
if c2s_timeout then
add_task(c2s_timeout, function ()
@ -245,6 +292,22 @@ function listener.onconnect(conn)
end
session.dispatch_stanza = stream_callbacks.handlestanza;
function session:sleep(by)
session.log("debug", "Sleeping for %s", by);
session.paused = by or "?";
session.conn:pause();
if co_running() == session.thread then
coroutine.yield("wait");
end
end
function session:wake(by)
assert(session.paused == (by or "?"));
session.log("debug", "Waking for %s", by);
session.paused = nil;
session.conn:resume();
session.data(); --FIXME: next tick?
end
end
function listener.onincoming(conn, data)