mirror of
https://github.com/bjc/prosody.git
synced 2025-04-03 21:27:38 +03:00
Merge 0.9->0.10
This commit is contained in:
commit
6116ba8cfb
4 changed files with 50 additions and 32 deletions
14
net/dns.lua
14
net/dns.lua
|
@ -754,17 +754,17 @@ function resolver:query(qname, qtype, qclass) -- - - - - - - - - - -- query
|
|||
self.active[id] = self.active[id] or {};
|
||||
self.active[id][question] = o;
|
||||
|
||||
-- remember which coroutine wants the answer
|
||||
if co then
|
||||
set(self.wanted, qclass, qtype, qname, co, true);
|
||||
end
|
||||
|
||||
local conn, err = self:getsocket(o.server)
|
||||
if not conn then
|
||||
return nil, err;
|
||||
end
|
||||
conn:send (o.packet)
|
||||
|
||||
-- remember which coroutine wants the answer
|
||||
if co then
|
||||
set(self.wanted, qclass, qtype, qname, co, true);
|
||||
end
|
||||
|
||||
if timer and self.timeout then
|
||||
local num_servers = #self.server;
|
||||
local i = 1;
|
||||
|
@ -861,7 +861,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
|
|||
-- retire the query
|
||||
local queries = self.active[response.header.id];
|
||||
queries[response.question.raw] = nil;
|
||||
|
||||
|
||||
if not next(queries) then self.active[response.header.id] = nil; end
|
||||
if not next(self.active) then self:closeall(); end
|
||||
|
||||
|
@ -875,7 +875,7 @@ function resolver:receive(rset) -- - - - - - - - - - - - - - - - - receive
|
|||
set(self.wanted, q.class, q.type, q.name, nil);
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -50,7 +50,7 @@ if prosody then
|
|||
local settings = config_get("*", "network_settings") or {};
|
||||
if use_luaevent then
|
||||
local event_settings = {
|
||||
ACCEPT_DELAY = settings.event_accept_retry_interval;
|
||||
ACCEPT_DELAY = settings.accept_retry_interval;
|
||||
ACCEPT_QUEUE = settings.tcp_backlog;
|
||||
CLEAR_DELAY = settings.event_clear_interval;
|
||||
CONNECT_TIMEOUT = settings.connect_timeout;
|
||||
|
|
|
@ -89,6 +89,7 @@ local _socketlist
|
|||
local _closelist
|
||||
local _readtimes
|
||||
local _writetimes
|
||||
local _fullservers
|
||||
|
||||
--// simple data types //--
|
||||
|
||||
|
@ -103,6 +104,7 @@ local _readtraffic
|
|||
local _selecttimeout
|
||||
local _sleeptime
|
||||
local _tcpbacklog
|
||||
local _accepretry
|
||||
|
||||
local _starttime
|
||||
local _currenttime
|
||||
|
@ -131,6 +133,7 @@ _socketlist = { } -- key = socket, value = wrapped socket (handlers)
|
|||
_readtimes = { } -- key = handler, value = timestamp of last data reading
|
||||
_writetimes = { } -- key = handler, value = timestamp of last data writing/sending
|
||||
_closelist = { } -- handlers to close
|
||||
_fullservers = { } -- servers in a paused state while there are too many clients
|
||||
|
||||
_readlistlen = 0 -- length of readlist
|
||||
_sendlistlen = 0 -- length of sendlist
|
||||
|
@ -142,6 +145,7 @@ _readtraffic = 0
|
|||
_selecttimeout = 1 -- timeout of socket.select
|
||||
_sleeptime = 0 -- time to wait at the end of every loop
|
||||
_tcpbacklog = 128 -- some kind of hint to the OS
|
||||
_accepretry = 10 -- seconds to wait until the next attempt of a full server to accept
|
||||
|
||||
_maxsendlen = 51000 * 1024 -- max len of send buffer
|
||||
_maxreadlen = 25000 * 1024 -- max len of read buffer
|
||||
|
@ -210,6 +214,7 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
|
|||
socket = nil;
|
||||
end
|
||||
handler.paused = true;
|
||||
out_put("server.lua: server [", ip, "]:", serverport, " paused")
|
||||
end
|
||||
end
|
||||
handler.resume = function( )
|
||||
|
@ -220,7 +225,9 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
|
|||
end
|
||||
_readlistlen = addsocket(_readlist, socket, _readlistlen)
|
||||
_socketlist[ socket ] = handler
|
||||
_fullservers[ handler ] = nil
|
||||
handler.paused = false;
|
||||
out_put("server.lua: server [", ip, "]:", serverport, " resumed")
|
||||
end
|
||||
end
|
||||
handler.ip = function( )
|
||||
|
@ -235,6 +242,7 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
|
|||
handler.readbuffer = function( )
|
||||
if _readlistlen >= _maxselectlen or _sendlistlen >= _maxselectlen then
|
||||
handler.pause( )
|
||||
_fullservers[ handler ] = _currenttime
|
||||
out_put( "server.lua: refused new client connection: server full" )
|
||||
return false
|
||||
end
|
||||
|
@ -253,6 +261,8 @@ wrapserver = function( listeners, socket, ip, serverport, pattern, sslctx ) -- t
|
|||
return;
|
||||
elseif err then -- maybe timeout or something else
|
||||
out_put( "server.lua: error with new client connection: ", tostring(err) )
|
||||
handler.pause( )
|
||||
_fullservers[ handler ] = _currenttime
|
||||
return false
|
||||
end
|
||||
end
|
||||
|
@ -265,6 +275,7 @@ wrapconnection = function( server, listeners, socket, ip, serverport, clientport
|
|||
out_error("server.lua: Disallowed FD number: "..socket:getfd()) -- PROTIP: Switch to libevent
|
||||
socket:close( ) -- Should we send some kind of error here?
|
||||
if server then
|
||||
_fullservers[ server ] = _currenttime
|
||||
server.pause( )
|
||||
end
|
||||
return nil, nil, "fd-too-large"
|
||||
|
@ -806,6 +817,7 @@ getsettings = function( )
|
|||
max_connections = _maxselectlen;
|
||||
max_ssl_handshake_roundtrips = _maxsslhandshake;
|
||||
highest_allowed_fd = _maxfd;
|
||||
accept_retry_interval = _accepretry;
|
||||
}
|
||||
end
|
||||
|
||||
|
@ -821,6 +833,7 @@ changesettings = function( new )
|
|||
_tcpbacklog = tonumber( new.tcp_backlog ) or _tcpbacklog
|
||||
_sendtimeout = tonumber( new.send_timeout ) or _sendtimeout
|
||||
_readtimeout = tonumber( new.read_timeout ) or _readtimeout
|
||||
_accepretry = tonumber( new.accept_retry_interval ) or _accepretry
|
||||
_maxselectlen = new.max_connections or _maxselectlen
|
||||
_maxsslhandshake = new.max_ssl_handshake_roundtrips or _maxsslhandshake
|
||||
_maxfd = new.highest_allowed_fd or _maxfd
|
||||
|
@ -911,6 +924,13 @@ loop = function(once) -- this is the main loop of the program
|
|||
next_timer_time = next_timer_time - (_currenttime - _timer);
|
||||
end
|
||||
|
||||
for server, paused_time in pairs( _fullservers ) do
|
||||
if _currenttime - paused_time > _accepretry then
|
||||
_fullservers[ server ] = nil;
|
||||
server.resume();
|
||||
end
|
||||
end
|
||||
|
||||
-- wait some time (0 by default)
|
||||
socket_sleep( _sleeptime )
|
||||
until quitting;
|
||||
|
|
|
@ -18,31 +18,13 @@ local socket = require "socket";
|
|||
local adns = require "net.adns";
|
||||
local dns = require "net.dns";
|
||||
local t_insert, t_sort, ipairs = table.insert, table.sort, ipairs;
|
||||
local local_addresses = require "util.net".local_addresses;
|
||||
|
||||
local s2s_destroy_session = require "core.s2smanager".destroy_session;
|
||||
|
||||
local log = module._log;
|
||||
|
||||
local anysource = { IPv4 = "0.0.0.0", IPv6 = "::" };
|
||||
local function get_sources(addrs)
|
||||
local sources = {};
|
||||
for _, IP in ipairs(addrs) do
|
||||
local sock;
|
||||
if IP.proto == "IPv4" then
|
||||
sock = socket.udp();
|
||||
elseif IP.proto == "IPv6" then
|
||||
sock = socket.udp6();
|
||||
end
|
||||
sock:setpeername(IP.addr, 9);
|
||||
local localaddr = sock:getsockname() or anysource[IP.proto];
|
||||
sock:close();
|
||||
if not sources[localaddr] then
|
||||
sources[localaddr] = true;
|
||||
t_insert(sources, new_ip(localaddr, IP.proto));
|
||||
end
|
||||
end
|
||||
return sources;
|
||||
end
|
||||
local sources = {};
|
||||
local has_ipv4, has_ipv6;
|
||||
|
||||
local dns_timeout = module:get_option_number("dns_timeout", 15);
|
||||
|
@ -196,7 +178,7 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
|
|||
|
||||
if have_other_result then
|
||||
if #IPs > 0 then
|
||||
rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
|
||||
rfc6724_dest(host_session.ip_hosts, sources);
|
||||
for i = 1, #IPs do
|
||||
IPs[i] = {ip = IPs[i], port = connect_port};
|
||||
end
|
||||
|
@ -232,7 +214,7 @@ function s2sout.try_connect(host_session, connect_host, connect_port, err)
|
|||
|
||||
if have_other_result then
|
||||
if #IPs > 0 then
|
||||
rfc6724_dest(host_session.ip_hosts, get_sources(host_session.ip_hosts));
|
||||
rfc6724_dest(host_session.ip_hosts, sources);
|
||||
for i = 1, #IPs do
|
||||
IPs[i] = {ip = IPs[i], port = connect_port};
|
||||
end
|
||||
|
@ -320,12 +302,28 @@ module:hook_global("service-added", function (event)
|
|||
return;
|
||||
end
|
||||
for source, _ in pairs(s2s_sources) do
|
||||
if source:find(":") then
|
||||
has_ipv6 = true;
|
||||
if source == "*" or source == "0.0.0.0" then
|
||||
for _, addr in ipairs(local_addresses("ipv4", true)) do
|
||||
sources[#sources + 1] = new_ip(addr, "IPv4");
|
||||
end
|
||||
elseif source == "::" then
|
||||
for _, addr in ipairs(local_addresses("ipv6", true)) do
|
||||
sources[#sources + 1] = new_ip(addr, "IPv6");
|
||||
end
|
||||
else
|
||||
sources[#sources + 1] = new_ip(source, (source:find(":") and "IPv6") or "IPv4");
|
||||
end
|
||||
end
|
||||
for i = 1,#sources do
|
||||
if sources[i].proto == "IPv6" then
|
||||
has_ipv6 = true;
|
||||
elseif sources[i].proto == "IPv4" then
|
||||
has_ipv4 = true;
|
||||
end
|
||||
end
|
||||
if not (has_ipv4 or has_ipv6) then
|
||||
module:log("warn", "No local IPv4 or IPv6 addresses detected, outgoing connections may fail");
|
||||
end
|
||||
end);
|
||||
|
||||
return s2sout;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue