mirror of
https://github.com/bjc/prosody.git
synced 2025-04-06 22:57:38 +03:00
mod_storage_internal,_sql: Add limit to number of items in an archive store (fixes #733)
This commit is contained in:
parent
cabf11ab26
commit
d7761bd914
2 changed files with 73 additions and 1 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
local cache = require "util.cache";
|
||||||
local datamanager = require "core.storagemanager".olddm;
|
local datamanager = require "core.storagemanager".olddm;
|
||||||
local array = require "util.array";
|
local array = require "util.array";
|
||||||
local datetime = require "util.datetime";
|
local datetime = require "util.datetime";
|
||||||
|
@ -7,6 +8,9 @@ local id = require "util.id".medium;
|
||||||
|
|
||||||
local host = module.host;
|
local host = module.host;
|
||||||
|
|
||||||
|
local archive_item_limit = module:get_option_number("storage_archive_item_limit", 1000);
|
||||||
|
local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
|
||||||
|
|
||||||
local driver = {};
|
local driver = {};
|
||||||
|
|
||||||
function driver:open(store, typ)
|
function driver:open(store, typ)
|
||||||
|
@ -54,28 +58,56 @@ function archive:append(username, key, value, when, with)
|
||||||
value.attr.stamp = datetime.datetime(when);
|
value.attr.stamp = datetime.datetime(when);
|
||||||
value.attr.stamp_legacy = datetime.legacy(when);
|
value.attr.stamp_legacy = datetime.legacy(when);
|
||||||
|
|
||||||
|
local item_count = archive_item_count_cache:get(username);
|
||||||
|
|
||||||
if key then
|
if key then
|
||||||
local items, err = datamanager.list_load(username, host, self.store);
|
local items, err = datamanager.list_load(username, host, self.store);
|
||||||
if not items and err then return items, err; end
|
if not items and err then return items, err; end
|
||||||
|
|
||||||
|
-- Check the quota
|
||||||
|
item_count = items and #items or 0;
|
||||||
|
archive_item_count_cache:set(username, item_count);
|
||||||
|
if item_count >= archive_item_limit then
|
||||||
|
module:log("debug", "%s reached or over quota, not adding to store", username);
|
||||||
|
return nil, "quota-limit";
|
||||||
|
end
|
||||||
|
|
||||||
if items then
|
if items then
|
||||||
|
-- Filter out any item with the same key as the one being added
|
||||||
items = array(items);
|
items = array(items);
|
||||||
items:filter(function (item)
|
items:filter(function (item)
|
||||||
return item.key ~= key;
|
return item.key ~= key;
|
||||||
end);
|
end);
|
||||||
|
|
||||||
value.key = key;
|
value.key = key;
|
||||||
items:push(value);
|
items:push(value);
|
||||||
local ok, err = datamanager.list_store(username, host, self.store, items);
|
local ok, err = datamanager.list_store(username, host, self.store, items);
|
||||||
if not ok then return ok, err; end
|
if not ok then return ok, err; end
|
||||||
|
archive_item_count_cache:set(username, #items);
|
||||||
return key;
|
return key;
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
|
if not item_count then -- Item count not cached?
|
||||||
|
-- We need to load the list to get the number of items currently stored
|
||||||
|
local items, err = datamanager.list_load(username, host, self.store);
|
||||||
|
if not items and err then return items, err; end
|
||||||
|
item_count = items and #items or 0;
|
||||||
|
archive_item_count_cache:set(username, item_count);
|
||||||
|
end
|
||||||
|
if item_count >= archive_item_limit then
|
||||||
|
module:log("debug", "%s reached or over quota, not adding to store", username);
|
||||||
|
return nil, "quota-limit";
|
||||||
|
end
|
||||||
key = id();
|
key = id();
|
||||||
end
|
end
|
||||||
|
|
||||||
|
module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit);
|
||||||
|
|
||||||
value.key = key;
|
value.key = key;
|
||||||
|
|
||||||
local ok, err = datamanager.list_append(username, host, self.store, value);
|
local ok, err = datamanager.list_append(username, host, self.store, value);
|
||||||
if not ok then return ok, err; end
|
if not ok then return ok, err; end
|
||||||
|
archive_item_count_cache:set(username, item_count+1);
|
||||||
return key;
|
return key;
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -158,6 +190,7 @@ end
|
||||||
|
|
||||||
function archive:delete(username, query)
|
function archive:delete(username, query)
|
||||||
if not query or next(query) == nil then
|
if not query or next(query) == nil then
|
||||||
|
archive_item_count_cache:set(username, nil);
|
||||||
return datamanager.list_store(username, host, self.store, nil);
|
return datamanager.list_store(username, host, self.store, nil);
|
||||||
end
|
end
|
||||||
local items, err = datamanager.list_load(username, host, self.store);
|
local items, err = datamanager.list_load(username, host, self.store);
|
||||||
|
@ -165,6 +198,7 @@ function archive:delete(username, query)
|
||||||
if err then
|
if err then
|
||||||
return items, err;
|
return items, err;
|
||||||
end
|
end
|
||||||
|
archive_item_count_cache:set(username, 0);
|
||||||
-- Store is empty
|
-- Store is empty
|
||||||
return 0;
|
return 0;
|
||||||
end
|
end
|
||||||
|
@ -214,6 +248,7 @@ function archive:delete(username, query)
|
||||||
end
|
end
|
||||||
local ok, err = datamanager.list_store(username, host, self.store, items);
|
local ok, err = datamanager.list_store(username, host, self.store, items);
|
||||||
if not ok then return ok, err; end
|
if not ok then return ok, err; end
|
||||||
|
archive_item_count_cache:set(username, #items);
|
||||||
return count;
|
return count;
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
|
|
||||||
-- luacheck: ignore 212/self
|
-- luacheck: ignore 212/self
|
||||||
|
|
||||||
|
local cache = require "util.cache";
|
||||||
local json = require "util.json";
|
local json = require "util.json";
|
||||||
local sql = require "util.sql";
|
local sql = require "util.sql";
|
||||||
local xml_parse = require "util.xml".parse;
|
local xml_parse = require "util.xml".parse;
|
||||||
|
@ -148,6 +149,9 @@ end
|
||||||
|
|
||||||
--- Archive store API
|
--- Archive store API
|
||||||
|
|
||||||
|
local archive_item_limit = module:get_option_number("storage_archive_item_limit", 1000);
|
||||||
|
local archive_item_count_cache = cache.new(module:get_option("storage_archive_item_limit_cache_size", 1000));
|
||||||
|
|
||||||
-- luacheck: ignore 512 431/user 431/store
|
-- luacheck: ignore 512 431/user 431/store
|
||||||
local map_store = {};
|
local map_store = {};
|
||||||
map_store.__index = map_store;
|
map_store.__index = map_store;
|
||||||
|
@ -231,6 +235,32 @@ archive_store.caps = {
|
||||||
};
|
};
|
||||||
archive_store.__index = archive_store
|
archive_store.__index = archive_store
|
||||||
function archive_store:append(username, key, value, when, with)
|
function archive_store:append(username, key, value, when, with)
|
||||||
|
local item_count = archive_item_count_cache:get(username);
|
||||||
|
if not item_count then
|
||||||
|
local ok, ret = engine:transaction(function()
|
||||||
|
local count_sql = [[
|
||||||
|
SELECT COUNT(*) FROM "prosodyarchive"
|
||||||
|
WHERE "host"=? AND "user"=? AND "store"=?;
|
||||||
|
]];
|
||||||
|
local result = engine:select(count_sql, host, user, store);
|
||||||
|
if result then
|
||||||
|
for row in result do
|
||||||
|
item_count = row[1];
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end);
|
||||||
|
if not ok or not item_count then
|
||||||
|
module:log("error", "Failed while checking quota for %s: %s", username, ret);
|
||||||
|
return nil, "Failure while checking quota";
|
||||||
|
end
|
||||||
|
archive_item_count_cache:set(username, item_count);
|
||||||
|
end
|
||||||
|
|
||||||
|
module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit);
|
||||||
|
if item_count >= archive_item_limit then
|
||||||
|
return nil, "quota-limit";
|
||||||
|
end
|
||||||
|
|
||||||
local user,store = username,self.store;
|
local user,store = username,self.store;
|
||||||
when = when or os.time();
|
when = when or os.time();
|
||||||
with = with or "";
|
with = with or "";
|
||||||
|
@ -245,12 +275,18 @@ function archive_store:append(username, key, value, when, with)
|
||||||
VALUES (?,?,?,?,?,?,?,?);
|
VALUES (?,?,?,?,?,?,?,?);
|
||||||
]];
|
]];
|
||||||
if key then
|
if key then
|
||||||
engine:delete(delete_sql, host, user or "", store, key);
|
local result, err = engine:delete(delete_sql, host, user or "", store, key);
|
||||||
|
if result then
|
||||||
|
item_count = item_count - result:affected();
|
||||||
|
archive_item_count_cache:set(username, item_count);
|
||||||
|
end
|
||||||
else
|
else
|
||||||
|
item_count = item_count + 1;
|
||||||
key = uuid.generate();
|
key = uuid.generate();
|
||||||
end
|
end
|
||||||
local t, encoded_value = assert(serialize(value));
|
local t, encoded_value = assert(serialize(value));
|
||||||
engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value);
|
engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value);
|
||||||
|
archive_item_count_cache:set(username, item_count+1);
|
||||||
return key;
|
return key;
|
||||||
end);
|
end);
|
||||||
if not ok then return ok, ret; end
|
if not ok then return ok, ret; end
|
||||||
|
@ -422,6 +458,7 @@ function archive_store:delete(username, query)
|
||||||
end
|
end
|
||||||
return engine:delete(sql_query, unpack(args));
|
return engine:delete(sql_query, unpack(args));
|
||||||
end);
|
end);
|
||||||
|
archive_item_count_cache:set(username, nil);
|
||||||
return ok and stmt:affected(), stmt;
|
return ok and stmt:affected(), stmt;
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue