mirror of
https://github.com/bjc/prosody.git
synced 2025-04-04 05:37:39 +03:00
mod_admin_socket, util.adminstream: New module to manage a local unix domain socket for admin functionality
This commit is contained in:
parent
6ccd66e347
commit
4c6992a00e
2 changed files with 354 additions and 0 deletions
69
plugins/mod_admin_socket.lua
Normal file
69
plugins/mod_admin_socket.lua
Normal file
|
@ -0,0 +1,69 @@
|
|||
module:set_global();
|
||||
|
||||
local have_unix, unix = pcall(require, "socket.unix");
|
||||
|
||||
if not have_unix or type(unix) ~= "table" then
|
||||
module:log_status("error", "LuaSocket unix socket support not available or incompatible, ensure it is up to date");
|
||||
return;
|
||||
end
|
||||
|
||||
local server = require "net.server";
|
||||
|
||||
local adminstream = require "util.adminstream";
|
||||
|
||||
local socket_path = module:get_option_string("admin_socket", prosody.paths.data.."/prosody.sock");
|
||||
|
||||
local sessions = module:shared("sessions");
|
||||
|
||||
local function fire_admin_event(session, stanza)
|
||||
local event_data = {
|
||||
origin = session, stanza = stanza;
|
||||
};
|
||||
local event_name;
|
||||
if stanza.attr.xmlns then
|
||||
event_name = "admin/"..stanza.attr.xmlns..":"..stanza.name;
|
||||
else
|
||||
event_name = "admin/"..stanza.name;
|
||||
end
|
||||
module:log("debug", "Firing %s", event_name);
|
||||
return module:fire_event(event_name, event_data);
|
||||
end
|
||||
|
||||
module:hook("server-stopping", function ()
|
||||
for _, session in pairs(sessions) do
|
||||
session:close("system-shutdown");
|
||||
end
|
||||
os.remove(socket_path);
|
||||
end);
|
||||
|
||||
--- Unix domain socket management
|
||||
|
||||
local conn, sock;
|
||||
|
||||
local listeners = adminstream.server(sessions, fire_admin_event).listeners;
|
||||
|
||||
local function accept_connection()
|
||||
module:log("debug", "accepting...");
|
||||
local client = sock:accept();
|
||||
if not client then return; end
|
||||
server.wrapclient(client, "unix", 0, listeners, "*a");
|
||||
end
|
||||
|
||||
function module.load()
|
||||
sock = unix.stream();
|
||||
sock:settimeout(0);
|
||||
os.remove(socket_path);
|
||||
assert(sock:bind(socket_path));
|
||||
assert(sock:listen());
|
||||
conn = server.watchfd(sock:getfd(), accept_connection);
|
||||
end
|
||||
|
||||
function module.unload()
|
||||
if conn then
|
||||
conn:close();
|
||||
end
|
||||
if sock then
|
||||
sock:close();
|
||||
end
|
||||
os.remove(socket_path);
|
||||
end
|
285
util/adminstream.lua
Normal file
285
util/adminstream.lua
Normal file
|
@ -0,0 +1,285 @@
|
|||
local st = require "util.stanza";
|
||||
local new_xmpp_stream = require "util.xmppstream".new;
|
||||
local sessionlib = require "util.session";
|
||||
local gettime = require "util.time".now;
|
||||
local runner = require "util.async".runner;
|
||||
local add_task = require "util.timer".add_task;
|
||||
local events = require "util.events";
|
||||
|
||||
local stream_close_timeout = 5;
|
||||
|
||||
local log = require "util.logger".init("adminstream");
|
||||
|
||||
local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
|
||||
|
||||
local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" };
|
||||
|
||||
function stream_callbacks.streamopened(session, attr)
|
||||
-- run _streamopened in async context
|
||||
session.thread:run({ stream = "opened", attr = attr });
|
||||
end
|
||||
|
||||
function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr
|
||||
if session.type ~= "client" then
|
||||
session:open_stream();
|
||||
end
|
||||
session.notopen = nil;
|
||||
end
|
||||
|
||||
function stream_callbacks.streamclosed(session, attr)
|
||||
-- run _streamclosed in async context
|
||||
session.thread:run({ stream = "closed", attr = attr });
|
||||
end
|
||||
|
||||
function stream_callbacks._streamclosed(session)
|
||||
session.log("debug", "Received </stream:stream>");
|
||||
session:close(false);
|
||||
end
|
||||
|
||||
function stream_callbacks.error(session, error, data)
|
||||
if error == "no-stream" then
|
||||
session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}")));
|
||||
session:close("invalid-namespace");
|
||||
elseif error == "parse-error" then
|
||||
session.log("debug", "Client XML parse error: %s", data);
|
||||
session:close("not-well-formed");
|
||||
elseif error == "stream-error" then
|
||||
local condition, text = "undefined-condition";
|
||||
for child in data:childtags(nil, xmlns_xmpp_streams) do
|
||||
if child.name ~= "text" then
|
||||
condition = child.name;
|
||||
else
|
||||
text = child:get_text();
|
||||
end
|
||||
if condition ~= "undefined-condition" and text then
|
||||
break;
|
||||
end
|
||||
end
|
||||
text = condition .. (text and (" ("..text..")") or "");
|
||||
session.log("info", "Session closed by remote with error: %s", text);
|
||||
session:close(nil, text);
|
||||
end
|
||||
end
|
||||
|
||||
function stream_callbacks.handlestanza(session, stanza)
|
||||
session.thread:run(stanza);
|
||||
end
|
||||
|
||||
local runner_callbacks = {};
|
||||
|
||||
function runner_callbacks:error(err)
|
||||
self.data.log("error", "Traceback[c2s]: %s", err);
|
||||
end
|
||||
|
||||
local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
|
||||
|
||||
local function destroy_session(session, reason)
|
||||
if session.destroyed then return; end
|
||||
session.destroyed = true;
|
||||
session.log("debug", "Destroying session: %s", reason or "unknown reason");
|
||||
end
|
||||
|
||||
local function session_close(session, reason)
|
||||
local log = session.log or log;
|
||||
if session.conn then
|
||||
if session.notopen then
|
||||
session:open_stream();
|
||||
end
|
||||
if reason then -- nil == no err, initiated by us, false == initiated by client
|
||||
local stream_error = st.stanza("stream:error");
|
||||
if type(reason) == "string" then -- assume stream error
|
||||
stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
|
||||
elseif type(reason) == "table" then
|
||||
if reason.condition then
|
||||
stream_error:tag(reason.condition, stream_xmlns_attr):up();
|
||||
if reason.text then
|
||||
stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
|
||||
end
|
||||
if reason.extra then
|
||||
stream_error:add_child(reason.extra);
|
||||
end
|
||||
elseif reason.name then -- a stanza
|
||||
stream_error = reason;
|
||||
end
|
||||
end
|
||||
stream_error = tostring(stream_error);
|
||||
log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
|
||||
session.send(stream_error);
|
||||
end
|
||||
|
||||
session.send("</stream:stream>");
|
||||
function session.send() return false; end
|
||||
|
||||
local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
|
||||
session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed");
|
||||
|
||||
-- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
|
||||
local conn = session.conn;
|
||||
if reason_text == nil and not session.notopen and session.type == "c2s" then
|
||||
-- Grace time to process data from authenticated cleanly-closed stream
|
||||
add_task(stream_close_timeout, function ()
|
||||
if not session.destroyed then
|
||||
session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
|
||||
destroy_session(session);
|
||||
conn:close();
|
||||
end
|
||||
end);
|
||||
else
|
||||
destroy_session(session, reason_text);
|
||||
conn:close();
|
||||
end
|
||||
else
|
||||
local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
|
||||
destroy_session(session, reason_text);
|
||||
end
|
||||
end
|
||||
|
||||
--- Public methods
|
||||
|
||||
local function new_server(sessions, stanza_handler)
|
||||
local listeners = {};
|
||||
|
||||
function listeners.onconnect(conn)
|
||||
log("debug", "New connection");
|
||||
local session = sessionlib.new("admin");
|
||||
sessionlib.set_id(session);
|
||||
sessionlib.set_logger(session);
|
||||
sessionlib.set_conn(session, conn);
|
||||
|
||||
session.conntime = gettime();
|
||||
session.type = "admin";
|
||||
|
||||
local stream = new_xmpp_stream(session, stream_callbacks);
|
||||
session.stream = stream;
|
||||
session.notopen = true;
|
||||
|
||||
session.thread = runner(function (stanza)
|
||||
if st.is_stanza(stanza) then
|
||||
stanza_handler(session, stanza);
|
||||
elseif stanza.stream == "opened" then
|
||||
stream_callbacks._streamopened(session, stanza.attr);
|
||||
elseif stanza.stream == "closed" then
|
||||
stream_callbacks._streamclosed(session, stanza.attr);
|
||||
end
|
||||
end, runner_callbacks, session);
|
||||
|
||||
function session.data(data)
|
||||
-- Parse the data, which will store stanzas in session.pending_stanzas
|
||||
if data then
|
||||
local ok, err = stream:feed(data);
|
||||
if not ok then
|
||||
session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
|
||||
session:close("not-well-formed");
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
session.close = session_close;
|
||||
|
||||
session.send = function (t)
|
||||
session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
|
||||
return session.rawsend(tostring(t));
|
||||
end
|
||||
|
||||
function session.rawsend(t)
|
||||
local ret, err = conn:write(t);
|
||||
if not ret then
|
||||
session.log("debug", "Error writing to connection: %s", err);
|
||||
return false, err;
|
||||
end
|
||||
return true;
|
||||
end
|
||||
|
||||
sessions[conn] = session;
|
||||
end
|
||||
|
||||
function listeners.onincoming(conn, data)
|
||||
local session = sessions[conn];
|
||||
if session then
|
||||
session.data(data);
|
||||
end
|
||||
end
|
||||
|
||||
function listeners.ondisconnect(conn, err)
|
||||
local session = sessions[conn];
|
||||
if session then
|
||||
session.log("info", "Admin client disconnected: %s", err or "connection closed");
|
||||
session.conn = nil;
|
||||
sessions[conn] = nil;
|
||||
end
|
||||
end
|
||||
return {
|
||||
listeners = listeners;
|
||||
};
|
||||
end
|
||||
|
||||
local function new_client()
|
||||
local client = {
|
||||
type = "client";
|
||||
events = events.new();
|
||||
log = log;
|
||||
};
|
||||
|
||||
local listeners = {};
|
||||
|
||||
function listeners.onconnect(conn)
|
||||
log("debug", "Connected");
|
||||
client.conn = conn;
|
||||
|
||||
local stream = new_xmpp_stream(client, stream_callbacks);
|
||||
client.stream = stream;
|
||||
client.notopen = true;
|
||||
|
||||
client.thread = runner(function (stanza)
|
||||
if st.is_stanza(stanza) then
|
||||
client.events.fire_event("received", stanza);
|
||||
elseif stanza.stream == "opened" then
|
||||
stream_callbacks._streamopened(client, stanza.attr);
|
||||
client.events.fire_event("connected");
|
||||
elseif stanza.stream == "closed" then
|
||||
client.events.fire_event("disconnected");
|
||||
stream_callbacks._streamclosed(client, stanza.attr);
|
||||
end
|
||||
end, runner_callbacks, client);
|
||||
|
||||
client.close = session_close;
|
||||
|
||||
function client.send(t)
|
||||
client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
|
||||
return client.rawsend(tostring(t));
|
||||
end
|
||||
|
||||
function client.rawsend(t)
|
||||
local ret, err = conn:write(t);
|
||||
if not ret then
|
||||
client.log("debug", "Error writing to connection: %s", err);
|
||||
return false, err;
|
||||
end
|
||||
return true;
|
||||
end
|
||||
client.log("debug", "Opening stream...");
|
||||
client:open_stream();
|
||||
end
|
||||
|
||||
function listeners.onincoming(conn, data) --luacheck: ignore 212/conn
|
||||
local ok, err = client.stream:feed(data);
|
||||
if not ok then
|
||||
client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
|
||||
client:close("not-well-formed");
|
||||
end
|
||||
end
|
||||
|
||||
function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn
|
||||
client.log("info", "Admin client disconnected: %s", err or "connection closed");
|
||||
client.conn = nil;
|
||||
end
|
||||
|
||||
client.listeners = listeners;
|
||||
|
||||
return client;
|
||||
end
|
||||
|
||||
return {
|
||||
server = new_server;
|
||||
client = new_client;
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue