mirror of
https://github.com/bjc/prosody.git
synced 2025-04-03 21:27:38 +03:00
net.server_epoll: Improve efficiency of sending much buffered data
Problem: The string slice operations when a lot of data gets buffered ends up being expensive and memory-consuming. We have util.dbuffer for precisely this kind of thing. I want to keep the behavior of writebuffer being upgraded from nil to a string to full buffer since the last step involves three table allocations, where the previous buffer method only used one. Avoiding those allocations for simple writes like white space keep alive feels like it would keep memory churn down. This work was started in 2020
This commit is contained in:
parent
3b07918800
commit
693079c619
1 changed files with 68 additions and 34 deletions
|
@ -6,8 +6,6 @@
|
|||
--
|
||||
|
||||
|
||||
local t_insert = table.insert;
|
||||
local t_concat = table.concat;
|
||||
local setmetatable = setmetatable;
|
||||
local pcall = pcall;
|
||||
local type = type;
|
||||
|
@ -22,6 +20,7 @@ local realtime = require "prosody.util.time".now;
|
|||
local monotonic = require "prosody.util.time".monotonic;
|
||||
local indexedbheap = require "prosody.util.indexedbheap";
|
||||
local createtable = require "prosody.util.table".create;
|
||||
local dbuffer = require "prosody.util.dbuffer";
|
||||
local inet = require "prosody.util.net";
|
||||
local inet_pton = inet.pton;
|
||||
local _SOCKETINVALID = socket._SOCKETINVALID or -1;
|
||||
|
@ -94,6 +93,15 @@ local default_config = { __index = {
|
|||
-- Size of chunks to read from sockets
|
||||
read_size = 8192;
|
||||
|
||||
-- Maximum size of send buffer, after which additional data is rejected
|
||||
max_send_buffer_size = 32*1024*1024;
|
||||
|
||||
-- How many chunks (immutable strings) to keep in the send buffer
|
||||
send_buffer_chunks = nil;
|
||||
|
||||
-- Maximum amount of data to send at once (to the TCP buffers), default based on /proc/sys/net/ipv4/tcp_wmem
|
||||
max_send_chunk = 4*1024*1024;
|
||||
|
||||
-- Timeout used during between steps in TLS handshakes
|
||||
ssl_handshake_timeout = 60;
|
||||
|
||||
|
@ -533,26 +541,21 @@ function interface:onwritable()
|
|||
self:onconnect();
|
||||
if not self.conn then return nil, "no-conn"; end -- could have been closed in onconnect
|
||||
self:on("predrain");
|
||||
local buffer = self.writebuffer;
|
||||
local data = buffer or "";
|
||||
if type(buffer) == "table" then
|
||||
if buffer[3] then
|
||||
data = t_concat(data);
|
||||
elseif buffer[2] then
|
||||
data = buffer[1] .. buffer[2];
|
||||
else
|
||||
data = buffer[1] or "";
|
||||
end
|
||||
end
|
||||
local buffer = self.writebuffer or "";
|
||||
-- Naming things ... s/data/slice/ ?
|
||||
local data = buffer:sub(1, cfg.max_send_chunk);
|
||||
local ok, err, partial = self.conn:send(data);
|
||||
self._writable = ok;
|
||||
if ok then
|
||||
if ok and #data < #buffer then
|
||||
-- Sent the whole 'data' but there's more in the buffer
|
||||
ok, err, partial = nil, "timeout", ok;
|
||||
end
|
||||
self:debug("Sent %d out of %d buffered bytes", ok and #data or partial or 0, #buffer);
|
||||
if ok then -- all the data we had was sent successfully
|
||||
self:set(nil, false);
|
||||
if cfg.keep_buffers and type(buffer) == "table" then
|
||||
for i = #buffer, 1, -1 do
|
||||
buffer[i] = nil;
|
||||
end
|
||||
else
|
||||
buffer:discard(ok);
|
||||
else -- string or don't keep buffers
|
||||
self.writebuffer = nil;
|
||||
end
|
||||
self._writing = nil;
|
||||
|
@ -560,14 +563,10 @@ function interface:onwritable()
|
|||
self:ondrain(); -- Be aware of writes in ondrain
|
||||
return ok;
|
||||
elseif partial then
|
||||
self:debug("Sent %d out of %d buffered bytes", partial, #data);
|
||||
if cfg.keep_buffers and type(buffer) == "table" then
|
||||
buffer[1] = data:sub(partial+1);
|
||||
for i = #buffer, 2, -1 do
|
||||
buffer[i] = nil;
|
||||
end
|
||||
if type(buffer) == "table" then
|
||||
buffer:discard(partial);
|
||||
else
|
||||
self.writebuffer = data:sub(partial+1);
|
||||
self.writebuffer = data:sub(partial + 1);
|
||||
end
|
||||
self:set(nil, true);
|
||||
self:setwritetimeout();
|
||||
|
@ -595,13 +594,45 @@ end
|
|||
-- Add data to write buffer and set flag for wanting to write
|
||||
function interface:write(data)
|
||||
local buffer = self.writebuffer;
|
||||
if type(buffer) == "table" then
|
||||
t_insert(buffer, data);
|
||||
elseif type(buffer) == "string" then
|
||||
self:noise("Allocating buffer!")
|
||||
self.writebuffer = { buffer, data };
|
||||
elseif buffer == nil then
|
||||
-- (nil) -> save string
|
||||
-- (string) -> convert to buffer (3 tables!)
|
||||
-- (buffer) -> write to buffer
|
||||
if not buffer then
|
||||
self.writebuffer = data;
|
||||
elseif type(buffer) == "string" then
|
||||
local prev_buffer = buffer;
|
||||
buffer = dbuffer.new(cfg.max_send_buffer_size, cfg.send_buffer_chunks);
|
||||
self.writebuffer = buffer;
|
||||
if prev_buffer then
|
||||
-- TODO refactor, there's 3 copies of these lines
|
||||
if not buffer:write(prev_buffer) then
|
||||
if self._write_lock then
|
||||
return false;
|
||||
end
|
||||
-- Try to flush buffer to make room
|
||||
self:onwritable();
|
||||
if not buffer:write(prev_buffer) then
|
||||
return false;
|
||||
end
|
||||
end
|
||||
end
|
||||
if not buffer:write(data) then
|
||||
if self._write_lock then
|
||||
return false;
|
||||
end
|
||||
self:onwritable();
|
||||
if not buffer:write(data) then
|
||||
return false;
|
||||
end
|
||||
end
|
||||
elseif not buffer:write(data) then
|
||||
if self._write_lock then
|
||||
return false;
|
||||
end
|
||||
self:onwritable();
|
||||
if not buffer:write(data) then
|
||||
return false;
|
||||
end
|
||||
end
|
||||
if not self._write_lock and not self._writing then
|
||||
if self._writable and cfg.opportunistic_writes and not self._opportunistic_write then
|
||||
|
@ -619,7 +650,7 @@ interface.send = interface.write;
|
|||
|
||||
-- Close, possibly after writing is done
|
||||
function interface:close()
|
||||
if self._connected and self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
|
||||
if self.writebuffer and #self.writebuffer ~= 0 then
|
||||
self._connected = false;
|
||||
self:set(false, true); -- Flush final buffer contents
|
||||
self:setreadtimeout(false);
|
||||
|
@ -701,7 +732,7 @@ end
|
|||
function interface:starttls(tls_ctx)
|
||||
if tls_ctx then self.tls_ctx = tls_ctx; end
|
||||
self.starttls = false;
|
||||
if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
|
||||
if self.writebuffer and #self.writebuffer ~= 0 then
|
||||
self:debug("Start TLS after write");
|
||||
self.ondrain = interface.starttls;
|
||||
self:set(nil, true); -- make sure wantwrite is set
|
||||
|
@ -935,7 +966,10 @@ function interface:resume_writes()
|
|||
end
|
||||
self:noise("Resume writes");
|
||||
self._write_lock = nil;
|
||||
if self.writebuffer and (self.writebuffer[1] or type(self.writebuffer) == "string") then
|
||||
if self.writebuffer and #self.writebuffer ~= 0 then
|
||||
if cfg.opportunistic_writes then
|
||||
return self:onwritable();
|
||||
end
|
||||
self:setwritetimeout();
|
||||
self:set(nil, true);
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue