mod_mam: Perform message expiry based on building an index by date (backport of 39ee70fbb009 from trunk)

For each day, store a set of users that have new messages. To expire
messages, we collect the union of sets of users from dates that fall
outside the cleanup range.

The previous algoritm did not work well with many users, especially with
the default settings.
This commit is contained in:
Kim Alvefur 2019-03-22 17:32:56 +01:00
parent fffb4ee43d
commit 5c3633477b

View file

@ -33,7 +33,7 @@ local is_stanza = st.is_stanza;
local tostring = tostring;
local time_now = os.time;
local m_min = math.min;
local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse;
local timestamp, timestamp_parse, datestamp = import( "util.datetime", "datetime", "parse", "date");
local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50);
local strip_tags = module:get_option_set("dont_archive_namespaces", { "http://jabber.org/protocol/chatstates" });
@ -46,13 +46,8 @@ if not archive.find then
end
local use_total = module:get_option_boolean("mam_include_total", true);
local cleanup;
local function schedule_cleanup(username)
if cleanup and not cleanup[username] then
table.insert(cleanup, username);
cleanup[username] = true;
end
function schedule_cleanup()
-- replaced by non-noop later if cleanup is enabled
end
-- Handle prefs.
@ -96,7 +91,6 @@ module:hook("iq-set/self/"..xmlns_mam..":query", function(event)
local qid = query.attr.queryid;
get_prefs(origin.username, true);
schedule_cleanup(origin.username);
-- Search query parameters
local qwith, qstart, qend;
@ -212,6 +206,7 @@ end
local function shall_store(user, who)
-- TODO Cache this?
if not um.user_exists(user, host) then
module:log("debug", "%s@%s does not exist", user, host)
return false;
end
local prefs = get_prefs(user);
@ -329,6 +324,9 @@ module:hook("pre-message/full", strip_stanza_id_after_other_events, -1);
local cleanup_after = module:get_option_string("archive_expires_after", "1w");
local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60);
if cleanup_after ~= "never" then
local cleanup_storage = module:open_store("archive_cleanup");
local cleanup_map = module:open_store("archive_cleanup", "map");
local day = 86400;
local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day };
local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)");
@ -346,33 +344,47 @@ if cleanup_after ~= "never" then
return false;
end
-- Set of known users to do message expiry for
-- Populated either below or when new messages are added
cleanup = {};
-- For each day, store a set of users that have new messages. To expire
-- messages, we collect the union of sets of users from dates that fall
-- outside the cleanup range.
-- Iterating over users is not supported by all authentication modules
-- Catch and ignore error if not supported
pcall(function ()
-- If this works, then we schedule cleanup for all known users on startup
for user in um.users(module.host) do
schedule_cleanup(user);
function schedule_cleanup(username, date)
cleanup_map:set(date or datestamp(), username, true);
end
cleanup_runner = require "util.async".runner(function ()
local users = {};
local cut_off = datestamp(os.time() - cleanup_after);
for date in cleanup_storage:users() do
if date <= cut_off then
module:log("debug", "Messages from %q should be expired", date);
local messages_this_day = cleanup_storage:get(date);
if messages_this_day then
for user in pairs(messages_this_day) do
users[user] = true;
end
if date < cut_off then
-- Messages from the same day as the cut-off might not have expired yet,
-- but all earlier will have, so clear storage for those days.
cleanup_storage:set(date, nil);
end
end
end
end
local sum, num_users = 0, 0;
for user in pairs(users) do
local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; })
if ok then
num_users = num_users + 1;
sum = sum + (tonumber(ok) or 0);
end
end
module:log("info", "Deleted %d expired messages for %d users", sum, num_users);
end);
-- At odd intervals, delete old messages for one user
module:add_timer(math.random(10, 60), function()
local user = table.remove(cleanup, 1);
if user then
module:log("debug", "Removing old messages for user %q", user);
local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; })
if not ok then
module:log("warn", "Could not expire archives for user %s: %s", user, err);
elseif type(ok) == "number" then
module:log("debug", "Removed %d messages", ok);
end
cleanup[user] = nil;
end
return math.random(cleanup_interval, cleanup_interval * 2);
cleanup_task = module:add_timer(1, function ()
cleanup_runner:run(true);
return cleanup_interval;
end);
else
module:log("debug", "Archive expiry disabled");