mirror of
https://github.com/bjc/prosody.git
synced 2025-04-01 20:27:39 +03:00
Deleting one index should be safe enough to do without requiring a explicitly upgrading during downtime. People had trouble with this.
1042 lines
33 KiB
Lua
1042 lines
33 KiB
Lua
|
|
-- luacheck: ignore 212/self
|
|
|
|
local cache = require "prosody.util.cache";
|
|
local json = require "prosody.util.json";
|
|
local xml_parse = require "prosody.util.xml".parse;
|
|
local uuid = require "prosody.util.uuid";
|
|
local resolve_relative_path = require "prosody.util.paths".resolve_relative_path;
|
|
local jid_join = require "prosody.util.jid".join;
|
|
|
|
local is_stanza = require"prosody.util.stanza".is_stanza;
|
|
local t_concat = table.concat;
|
|
|
|
local have_dbisql, dbisql = pcall(require, "prosody.util.sql");
|
|
local have_sqlite, sqlite = pcall(require, "prosody.util.sqlite3");
|
|
if not have_dbisql then
|
|
module:log("debug", "Could not load LuaDBI: %s", dbisql)
|
|
dbisql = nil;
|
|
end
|
|
if not have_sqlite then
|
|
module:log("debug", "Could not load LuaSQLite3: %s", sqlite)
|
|
sqlite = nil;
|
|
end
|
|
if not (have_dbisql or have_sqlite) then
|
|
module:log("error", "LuaDBI or LuaSQLite3 are required for using SQL databases but neither are installed");
|
|
module:log("error", "Please install at least one of LuaDBI and LuaSQLite3. See https://prosody.im/doc/depends");
|
|
error("No SQL library available")
|
|
end
|
|
|
|
local noop = function() end
|
|
local unpack = table.unpack;
|
|
local function iterator(result)
|
|
return function(result_)
|
|
local row = result_();
|
|
if row ~= nil then
|
|
return unpack(row);
|
|
end
|
|
end, result, nil;
|
|
end
|
|
|
|
-- COMPAT Support for UPSERT is not in all versions of all compatible databases.
|
|
local function has_upsert(engine)
|
|
if engine.params.driver == "SQLite3" then
|
|
-- SQLite3 >= 3.24.0
|
|
return engine.sqlite_version and (engine.sqlite_version[2] or 0) >= 24;
|
|
elseif engine.params.driver == "PostgreSQL" then
|
|
-- PostgreSQL >= 9.5
|
|
-- Versions without support have long since reached end of life.
|
|
return true;
|
|
end
|
|
-- We don't support UPSERT on MySQL/MariaDB, they seem to have a completely different syntax, uncertaint from which versions.
|
|
return false
|
|
end
|
|
|
|
local default_params = { driver = "SQLite3" };
|
|
|
|
local engine;
|
|
|
|
local function serialize(value)
|
|
local t = type(value);
|
|
if t == "string" or t == "boolean" or t == "number" then
|
|
return t, tostring(value);
|
|
elseif is_stanza(value) then
|
|
return "xml", tostring(value);
|
|
elseif t == "table" then
|
|
local encoded,err = json.encode(value);
|
|
if encoded then return "json", encoded; end
|
|
return nil, err;
|
|
end
|
|
return nil, "Unhandled value type: "..t;
|
|
end
|
|
local function deserialize(t, value)
|
|
if t == "string" then return value;
|
|
elseif t == "boolean" then
|
|
if value == "true" then return true;
|
|
elseif value == "false" then return false; end
|
|
return nil, "invalid-boolean";
|
|
elseif t == "number" then
|
|
value = tonumber(value);
|
|
if value then return value; end
|
|
return nil, "invalid-number";
|
|
elseif t == "json" then
|
|
return json.decode(value);
|
|
elseif t == "xml" then
|
|
return xml_parse(value);
|
|
end
|
|
return nil, "Unhandled value type: "..t;
|
|
end
|
|
|
|
local host = module.host;
|
|
|
|
local function keyval_store_get(user, store)
|
|
local haveany;
|
|
local result = {};
|
|
local select_sql = [[
|
|
SELECT "key","type","value"
|
|
FROM "prosody"
|
|
WHERE "host"=? AND "user"=? AND "store"=?;
|
|
]]
|
|
for row in engine:select(select_sql, host, user or "", store) do
|
|
haveany = true;
|
|
local k = row[1];
|
|
local v, e = deserialize(row[2], row[3]);
|
|
assert(v ~= nil, e);
|
|
if k and v then
|
|
if k ~= "" then result[k] = v; elseif type(v) == "table" then
|
|
for a,b in pairs(v) do
|
|
result[a] = b;
|
|
end
|
|
end
|
|
end
|
|
end
|
|
if haveany then
|
|
return result;
|
|
end
|
|
end
|
|
local function keyval_store_set(data, user, store)
|
|
local delete_sql = [[
|
|
DELETE FROM "prosody"
|
|
WHERE "host"=? AND "user"=? AND "store"=?
|
|
]];
|
|
engine:delete(delete_sql, host, user or "", store);
|
|
|
|
local insert_sql = [[
|
|
INSERT INTO "prosody"
|
|
("host","user","store","key","type","value")
|
|
VALUES (?,?,?,?,?,?);
|
|
]]
|
|
if data and next(data) ~= nil then
|
|
local extradata = {};
|
|
for key, value in pairs(data) do
|
|
if type(key) == "string" and key ~= "" then
|
|
local t, encoded_value = assert(serialize(value));
|
|
engine:insert(insert_sql, host, user or "", store, key, t, encoded_value);
|
|
else
|
|
extradata[key] = value;
|
|
end
|
|
end
|
|
if next(extradata) ~= nil then
|
|
local t, encoded_extradata = assert(serialize(extradata));
|
|
engine:insert(insert_sql, host, user or "", store, "", t, encoded_extradata);
|
|
end
|
|
end
|
|
return true;
|
|
end
|
|
|
|
--- Key/value store API (default store type)
|
|
|
|
local keyval_store = {};
|
|
keyval_store.__index = keyval_store;
|
|
function keyval_store:get(username)
|
|
local ok, result = engine:transaction(keyval_store_get, username, self.store);
|
|
if not ok then
|
|
module:log("error", "Unable to read from database %s store for %s: %s", self.store, username or "<host>", result);
|
|
return nil, result;
|
|
end
|
|
return result;
|
|
end
|
|
function keyval_store:set(username, data)
|
|
return engine:transaction(keyval_store_set, data, username, self.store);
|
|
end
|
|
function keyval_store:users()
|
|
local ok, result = engine:transaction(function()
|
|
local select_sql = [[
|
|
SELECT DISTINCT "user"
|
|
FROM "prosody"
|
|
WHERE "host"=? AND "store"=?;
|
|
]];
|
|
return engine:select(select_sql, host, self.store);
|
|
end);
|
|
if not ok then error(result); end
|
|
return iterator(result);
|
|
end
|
|
|
|
--- Archive store API
|
|
|
|
local archive_item_limit = module:get_option_integer("storage_archive_item_limit", nil, 0);
|
|
local archive_item_count_cache = cache.new(module:get_option_integer("storage_archive_item_limit_cache_size", 1000, 1));
|
|
|
|
local item_count_cache_hit = module:measure("item_count_cache_hit", "rate");
|
|
local item_count_cache_miss = module:measure("item_count_cache_miss", "rate")
|
|
|
|
-- luacheck: ignore 512 431/user 431/store 431/err
|
|
local map_store = {};
|
|
map_store.__index = map_store;
|
|
map_store.remove = {};
|
|
function map_store:get(username, key)
|
|
local ok, result = engine:transaction(function()
|
|
local query = [[
|
|
SELECT "type", "value"
|
|
FROM "prosody"
|
|
WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?
|
|
LIMIT 1
|
|
]];
|
|
local data, err;
|
|
if type(key) == "string" and key ~= "" then
|
|
for row in engine:select(query, host, username or "", self.store, key) do
|
|
data, err = deserialize(row[1], row[2]);
|
|
assert(data ~= nil, err);
|
|
end
|
|
return data;
|
|
else
|
|
for row in engine:select(query, host, username or "", self.store, "") do
|
|
data, err = deserialize(row[1], row[2]);
|
|
assert(data ~= nil, err);
|
|
end
|
|
return data and data[key] or nil;
|
|
end
|
|
end);
|
|
if not ok then return nil, result; end
|
|
return result;
|
|
end
|
|
function map_store:set(username, key, data)
|
|
if data == nil then data = self.remove; end
|
|
return self:set_keys(username, { [key] = data });
|
|
end
|
|
function map_store:set_keys(username, keydatas)
|
|
local ok, result = engine:transaction(function()
|
|
local delete_sql = [[
|
|
DELETE FROM "prosody"
|
|
WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?;
|
|
]];
|
|
local insert_sql = [[
|
|
INSERT INTO "prosody"
|
|
("host","user","store","key","type","value")
|
|
VALUES (?,?,?,?,?,?);
|
|
]];
|
|
local upsert_sql = [[
|
|
INSERT INTO "prosody"
|
|
("host","user","store","key","type","value")
|
|
VALUES (?,?,?,?,?,?)
|
|
ON CONFLICT ("host", "user","store", "key")
|
|
DO UPDATE SET "type"=?, "value"=?;
|
|
]];
|
|
local select_extradata_sql = [[
|
|
SELECT "type", "value"
|
|
FROM "prosody"
|
|
WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?
|
|
LIMIT 1;
|
|
]];
|
|
for key, data in pairs(keydatas) do
|
|
if type(key) == "string" and key ~= "" and has_upsert(engine) and data ~= self.remove then
|
|
local t, value = assert(serialize(data));
|
|
engine:insert(upsert_sql, host, username or "", self.store, key, t, value, t, value);
|
|
elseif type(key) == "string" and key ~= "" then
|
|
engine:delete(delete_sql,
|
|
host, username or "", self.store, key);
|
|
if data ~= self.remove then
|
|
local t, value = assert(serialize(data));
|
|
engine:insert(insert_sql, host, username or "", self.store, key, t, value);
|
|
end
|
|
else
|
|
local extradata, err = {};
|
|
for row in engine:select(select_extradata_sql, host, username or "", self.store, "") do
|
|
extradata, err = deserialize(row[1], row[2]);
|
|
assert(extradata ~= nil, err);
|
|
end
|
|
engine:delete(delete_sql, host, username or "", self.store, "");
|
|
extradata[key] = data;
|
|
local t, value = assert(serialize(extradata));
|
|
engine:insert(insert_sql, host, username or "", self.store, "", t, value);
|
|
end
|
|
end
|
|
return true;
|
|
end);
|
|
if not ok then return nil, result; end
|
|
return result;
|
|
end
|
|
|
|
function map_store:get_all(key)
|
|
if type(key) ~= "string" or key == "" then
|
|
return nil, "get_all only supports non-empty string keys";
|
|
end
|
|
local ok, result = engine:transaction(function()
|
|
local query = [[
|
|
SELECT "user", "type", "value"
|
|
FROM "prosody"
|
|
WHERE "host"=? AND "store"=? AND "key"=?
|
|
]];
|
|
|
|
local data;
|
|
for row in engine:select(query, host, self.store, key) do
|
|
local key_data, err = deserialize(row[2], row[3]);
|
|
assert(key_data ~= nil, err);
|
|
if data == nil then
|
|
data = {};
|
|
end
|
|
data[row[1]] = key_data;
|
|
end
|
|
|
|
return data;
|
|
|
|
end);
|
|
if not ok then return nil, result; end
|
|
return result;
|
|
end
|
|
|
|
function map_store:delete_all(key)
|
|
if type(key) ~= "string" or key == "" then
|
|
return nil, "delete_all only supports non-empty string keys";
|
|
end
|
|
local ok, result = engine:transaction(function()
|
|
local delete_sql = [[
|
|
DELETE FROM "prosody"
|
|
WHERE "host"=? AND "store"=? AND "key"=?;
|
|
]];
|
|
engine:delete(delete_sql, host, self.store, key);
|
|
return true;
|
|
end);
|
|
if not ok then return nil, result; end
|
|
return result;
|
|
end
|
|
|
|
local archive_store = {}
|
|
archive_store.caps = {
|
|
total = true;
|
|
quota = archive_item_limit;
|
|
truncate = true;
|
|
full_id_range = true;
|
|
ids = true;
|
|
wildcard_delete = true;
|
|
};
|
|
archive_store.__index = archive_store
|
|
function archive_store:append(username, key, value, when, with)
|
|
local user,store = username,self.store;
|
|
local cache_key = jid_join(username, host, store);
|
|
local item_count = archive_item_count_cache:get(cache_key);
|
|
|
|
if archive_item_limit then
|
|
if not item_count then
|
|
item_count_cache_miss();
|
|
local ok, ret = engine:transaction(function()
|
|
local count_sql = [[
|
|
SELECT COUNT(*) FROM "prosodyarchive"
|
|
WHERE "host"=? AND "user"=? AND "store"=?;
|
|
]];
|
|
local result = engine:select(count_sql, host, user, store);
|
|
if result then
|
|
for row in result do
|
|
item_count = row[1];
|
|
end
|
|
end
|
|
end);
|
|
if not ok or not item_count then
|
|
module:log("error", "Failed while checking quota for %s: %s", username, ret);
|
|
return nil, "Failure while checking quota";
|
|
end
|
|
archive_item_count_cache:set(cache_key, item_count);
|
|
else
|
|
item_count_cache_hit();
|
|
end
|
|
|
|
module:log("debug", "%s has %d items out of %d limit", username, item_count, archive_item_limit);
|
|
if item_count >= archive_item_limit then
|
|
return nil, "quota-limit";
|
|
end
|
|
end
|
|
|
|
-- FIXME update the schema to allow precision timestamps
|
|
when = when or os.time();
|
|
if engine.params.driver ~= "SQLite3" then
|
|
-- SQLite3 doesn't enforce types :)
|
|
when = math.floor(when);
|
|
end
|
|
with = with or "";
|
|
local ok, ret = engine:transaction(function()
|
|
local delete_sql = [[
|
|
DELETE FROM "prosodyarchive"
|
|
WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?;
|
|
]];
|
|
local insert_sql = [[
|
|
INSERT INTO "prosodyarchive"
|
|
("host", "user", "store", "when", "with", "key", "type", "value")
|
|
VALUES (?,?,?,?,?,?,?,?);
|
|
]];
|
|
if key then
|
|
-- TODO use UPSERT like map store
|
|
local result = engine:delete(delete_sql, host, user or "", store, key);
|
|
if result and item_count then
|
|
item_count = item_count - result:affected();
|
|
end
|
|
else
|
|
key = uuid.v7();
|
|
end
|
|
local t, encoded_value = assert(serialize(value));
|
|
engine:insert(insert_sql, host, user or "", store, when, with, key, t, encoded_value);
|
|
if item_count then
|
|
archive_item_count_cache:set(cache_key, item_count+1);
|
|
end
|
|
return key;
|
|
end);
|
|
if not ok then return ok, ret; end
|
|
return ret; -- the key
|
|
end
|
|
|
|
-- Helpers for building the WHERE clause
|
|
local function archive_where(query, args, where)
|
|
-- Time range, inclusive
|
|
if query.start then
|
|
args[#args+1] = math.floor(query.start);
|
|
where[#where+1] = "\"when\" >= ?"
|
|
end
|
|
|
|
if query["end"] then
|
|
args[#args+1] = math.floor(query["end"]);
|
|
if query.start then
|
|
where[#where] = "\"when\" BETWEEN ? AND ?" -- is this inclusive?
|
|
else
|
|
where[#where+1] = "\"when\" <= ?"
|
|
end
|
|
end
|
|
|
|
-- Related name
|
|
if query.with then
|
|
where[#where+1] = "\"with\" = ?";
|
|
args[#args+1] = query.with
|
|
end
|
|
|
|
-- Unique id
|
|
if query.key then
|
|
where[#where+1] = "\"key\" = ?";
|
|
args[#args+1] = query.key
|
|
end
|
|
|
|
-- Set of ids
|
|
if query.ids then
|
|
local nids, nargs = #query.ids, #args;
|
|
where[#where + 1] = "\"key\" IN (" .. string.rep("?", nids, ",") .. ")";
|
|
for i, id in ipairs(query.ids) do
|
|
args[nargs+i] = id;
|
|
end
|
|
end
|
|
end
|
|
local function archive_where_id_range(query, args, where)
|
|
-- Before or after specific item, exclusive
|
|
local id_lookup_sql = [[
|
|
SELECT "sort_id"
|
|
FROM "prosodyarchive"
|
|
WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ?
|
|
LIMIT 1;
|
|
]];
|
|
if query.after then -- keys better be unique!
|
|
local after_id = nil;
|
|
for row in engine:select(id_lookup_sql, query.after, args[1], args[2], args[3]) do
|
|
after_id = row[1];
|
|
end
|
|
if not after_id then
|
|
return nil, "item-not-found";
|
|
end
|
|
where[#where+1] = '"sort_id" > ?';
|
|
args[#args+1] = after_id;
|
|
end
|
|
if query.before then
|
|
local before_id = nil;
|
|
for row in engine:select(id_lookup_sql, query.before, args[1], args[2], args[3]) do
|
|
before_id = row[1];
|
|
end
|
|
if not before_id then
|
|
return nil, "item-not-found";
|
|
end
|
|
where[#where+1] = '"sort_id" < ?';
|
|
args[#args+1] = before_id;
|
|
end
|
|
return true;
|
|
end
|
|
|
|
function archive_store:find(username, query)
|
|
query = query or {};
|
|
local user,store = username,self.store;
|
|
local cache_key = jid_join(username, host, self.store);
|
|
local total = archive_item_count_cache:get(cache_key);
|
|
(total and item_count_cache_hit or item_count_cache_miss)();
|
|
if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil and query.ids == nil then
|
|
-- the query is for the whole archive, so a cached 'total' should be a
|
|
-- relatively accurate response if that's all that is requested
|
|
if total ~= nil and query.limit == 0 then return noop, total; end
|
|
else
|
|
-- not usable, so refresh it later if needed
|
|
total = nil;
|
|
end
|
|
local ok, result, err = engine:transaction(function()
|
|
local sql_query = [[
|
|
SELECT "key", "type", "value", "when", "with"
|
|
FROM "prosodyarchive"
|
|
WHERE %s
|
|
ORDER BY "sort_id" %s%s;
|
|
]];
|
|
local args = { host, user or "", store, };
|
|
local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
|
|
|
|
archive_where(query, args, where);
|
|
|
|
-- Total matching
|
|
if query.total and not total then
|
|
|
|
local stats = engine:select("SELECT COUNT(*) FROM \"prosodyarchive\" WHERE "
|
|
.. t_concat(where, " AND "), unpack(args));
|
|
if stats then
|
|
for row in stats do
|
|
total = row[1];
|
|
end
|
|
end
|
|
if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil and query.ids == nil then
|
|
archive_item_count_cache:set(cache_key, total);
|
|
end
|
|
if query.limit == 0 then -- Skip the real query
|
|
return noop, total;
|
|
end
|
|
end
|
|
|
|
local ok, err = archive_where_id_range(query, args, where);
|
|
if not ok then return ok, err; end
|
|
|
|
sql_query = sql_query:format(t_concat(where, " AND "), query.reverse
|
|
and "DESC" or "ASC", query.limit and " LIMIT " .. query.limit or "");
|
|
return engine:select(sql_query, unpack(args));
|
|
end);
|
|
if not ok then return ok, result; end
|
|
if not result then return nil, err; end
|
|
return function()
|
|
local row = result();
|
|
if row ~= nil then
|
|
local value, err = deserialize(row[2], row[3]);
|
|
assert(value ~= nil, err);
|
|
return row[1], value, row[4], row[5];
|
|
end
|
|
end, total;
|
|
end
|
|
|
|
function archive_store:get(username, key)
|
|
local iter, err = self:find(username, { key = key })
|
|
if not iter then return iter, err; end
|
|
for _, stanza, when, with in iter do
|
|
return stanza, when, with;
|
|
end
|
|
return nil, "item-not-found";
|
|
end
|
|
|
|
function archive_store:set(username, key, new_value, new_when, new_with)
|
|
local user,store = username,self.store;
|
|
local ok, result = engine:transaction(function ()
|
|
|
|
local update_query = [[
|
|
UPDATE "prosodyarchive"
|
|
SET %s
|
|
WHERE %s
|
|
]];
|
|
local args = { host, user or "", store, key };
|
|
local setf = {};
|
|
local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", "\"key\" = ?"};
|
|
|
|
if new_value then
|
|
table.insert(setf, '"type" = ?')
|
|
table.insert(setf, '"value" = ?')
|
|
local t, value = serialize(new_value);
|
|
table.insert(args, 1, t);
|
|
table.insert(args, 2, value);
|
|
end
|
|
|
|
if new_when then
|
|
table.insert(setf, 1, '"when" = ?')
|
|
table.insert(args, 1, new_when);
|
|
end
|
|
|
|
if new_with then
|
|
table.insert(setf, 1, '"with" = ?')
|
|
table.insert(args, 1, new_with);
|
|
end
|
|
|
|
update_query = update_query:format(t_concat(setf, ", "), t_concat(where, " AND "));
|
|
return engine:update(update_query, unpack(args));
|
|
end);
|
|
if not ok then return ok, result; end
|
|
return result:affected() == 1;
|
|
end
|
|
|
|
function archive_store:summary(username, query)
|
|
query = query or {};
|
|
local user,store = username,self.store;
|
|
local ok, result = engine:transaction(function()
|
|
local sql_query = [[
|
|
SELECT DISTINCT "with", COUNT(*), MIN("when"), MAX("when")
|
|
FROM "prosodyarchive"
|
|
WHERE %s
|
|
GROUP BY "with";
|
|
]];
|
|
local args = { host, user or "", store, };
|
|
local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
|
|
|
|
archive_where(query, args, where);
|
|
|
|
archive_where_id_range(query, args, where);
|
|
|
|
if query.limit then
|
|
args[#args+1] = query.limit;
|
|
end
|
|
|
|
sql_query = sql_query:format(t_concat(where, " AND "));
|
|
return engine:select(sql_query, unpack(args));
|
|
end);
|
|
if not ok then return ok, result end
|
|
local counts = {};
|
|
local earliest, latest = {}, {};
|
|
for row in result do
|
|
local with, count = row[1], row[2];
|
|
counts[with] = count;
|
|
earliest[with] = row[3];
|
|
latest[with] = row[4];
|
|
end
|
|
return {
|
|
counts = counts;
|
|
earliest = earliest;
|
|
latest = latest;
|
|
};
|
|
end
|
|
|
|
function archive_store:delete(username, query)
|
|
query = query or {};
|
|
local user,store = username,self.store;
|
|
local ok, stmt = engine:transaction(function()
|
|
local sql_query = "DELETE FROM \"prosodyarchive\" WHERE %s;";
|
|
local args = { host, user or "", store, };
|
|
local where = { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
|
|
if user == true then
|
|
table.remove(args, 2);
|
|
table.remove(where, 2);
|
|
end
|
|
archive_where(query, args, where);
|
|
local ok, err = archive_where_id_range(query, args, where);
|
|
if not ok then return ok, err; end
|
|
if query.truncate == nil then
|
|
sql_query = sql_query:format(t_concat(where, " AND "));
|
|
elseif engine.params.driver == "MySQL" then
|
|
sql_query = [[
|
|
DELETE result FROM prosodyarchive AS result JOIN (
|
|
SELECT sort_id FROM prosodyarchive
|
|
WHERE %s
|
|
ORDER BY "sort_id" %s
|
|
LIMIT 18446744073709551615 OFFSET %s
|
|
) AS limiter on result.sort_id = limiter.sort_id;]];
|
|
|
|
sql_query = string.format(sql_query, t_concat(where, " AND "),
|
|
query.reverse and "ASC" or "DESC", query.truncate);
|
|
else
|
|
args[#args+1] = query.truncate;
|
|
local unlimited = "ALL";
|
|
sql_query = [[
|
|
DELETE FROM "prosodyarchive"
|
|
WHERE "sort_id" IN (
|
|
SELECT "sort_id" FROM "prosodyarchive"
|
|
WHERE %s
|
|
ORDER BY "sort_id" %s
|
|
LIMIT %s OFFSET ?
|
|
);]];
|
|
if engine.params.driver == "SQLite3" then
|
|
if engine.sqlite_compile_options.enable_update_delete_limit then
|
|
sql_query = [[
|
|
DELETE FROM "prosodyarchive"
|
|
WHERE %s
|
|
ORDER BY "sort_id" %s
|
|
LIMIT %s OFFSET ?;
|
|
]];
|
|
end
|
|
unlimited = "-1";
|
|
end
|
|
sql_query = string.format(sql_query, t_concat(where, " AND "),
|
|
query.reverse and "ASC" or "DESC", unlimited);
|
|
end
|
|
return engine:delete(sql_query, unpack(args));
|
|
end);
|
|
if username == true then
|
|
archive_item_count_cache:clear();
|
|
else
|
|
local cache_key = jid_join(username, host, self.store);
|
|
if query.start == nil and query.with == nil and query["end"] == nil and query.key == nil and query.ids == nil and query.truncate == nil then
|
|
-- All items deleted, count should be zero.
|
|
archive_item_count_cache:set(cache_key, 0);
|
|
else
|
|
-- Not sure how many items left
|
|
archive_item_count_cache:set(cache_key, nil);
|
|
end
|
|
end
|
|
return ok and stmt:affected(), stmt;
|
|
end
|
|
|
|
function archive_store:users()
|
|
local ok, result = engine:transaction(function()
|
|
local select_sql = [[
|
|
SELECT DISTINCT "user"
|
|
FROM "prosodyarchive"
|
|
WHERE "host"=? AND "store"=?;
|
|
]];
|
|
return engine:select(select_sql, host, self.store);
|
|
end);
|
|
if not ok then error(result); end
|
|
return iterator(result);
|
|
end
|
|
|
|
local keyvalplus = {
|
|
__index = {
|
|
-- keyval
|
|
get = keyval_store.get;
|
|
set = keyval_store.set;
|
|
items = keyval_store.users;
|
|
-- map
|
|
get_key = map_store.get;
|
|
set_key = map_store.set;
|
|
remove = map_store.remove;
|
|
set_keys = map_store.set_keys;
|
|
get_key_from_all = map_store.get_all;
|
|
delete_key_from_all = map_store.delete_all;
|
|
};
|
|
}
|
|
|
|
local stores = {
|
|
keyval = keyval_store;
|
|
map = map_store;
|
|
archive = archive_store;
|
|
["keyval+"] = keyvalplus;
|
|
};
|
|
|
|
--- Implement storage driver API
|
|
|
|
-- FIXME: Some of these operations need to operate on the archive store(s) too
|
|
|
|
local driver = {};
|
|
|
|
function driver:open(store, typ)
|
|
local store_mt = stores[typ or "keyval"];
|
|
if store_mt then
|
|
return setmetatable({ store = store }, store_mt);
|
|
end
|
|
return nil, "unsupported-store";
|
|
end
|
|
|
|
function driver:stores(username)
|
|
local query = "SELECT DISTINCT \"store\" FROM \"prosody\" WHERE \"host\"=? AND \"user\"" ..
|
|
(username == true and "!=?" or "=?");
|
|
if username == true or not username then
|
|
username = "";
|
|
end
|
|
local ok, result = engine:transaction(function()
|
|
return engine:select(query, host, username);
|
|
end);
|
|
if not ok then return ok, result end
|
|
return iterator(result);
|
|
end
|
|
|
|
function driver:purge(username)
|
|
return engine:transaction(function()
|
|
engine:delete("DELETE FROM \"prosody\" WHERE \"host\"=? AND \"user\"=?", host, username);
|
|
engine:delete("DELETE FROM \"prosodyarchive\" WHERE \"host\"=? AND \"user\"=?", host, username);
|
|
end);
|
|
end
|
|
|
|
--- Initialization
|
|
|
|
|
|
local function create_table(engine) -- luacheck: ignore 431/engine
|
|
local sql = engine.params.driver == "SQLite3" and sqlite or dbisql;
|
|
local Table, Column, Index = sql.Table, sql.Column, sql.Index;
|
|
|
|
local ProsodyTable = Table {
|
|
name = "prosody";
|
|
Column { name="host", type="TEXT", nullable=false };
|
|
Column { name="user", type="TEXT", nullable=false };
|
|
Column { name="store", type="TEXT", nullable=false };
|
|
Column { name="key", type="TEXT", nullable=false };
|
|
Column { name="type", type="TEXT", nullable=false };
|
|
Column { name="value", type="MEDIUMTEXT", nullable=false };
|
|
Index { name = "prosody_unique_index"; unique = engine.params.driver ~= "MySQL"; "host"; "user"; "store"; "key" };
|
|
};
|
|
engine:transaction(function()
|
|
ProsodyTable:create(engine);
|
|
end);
|
|
|
|
local ProsodyArchiveTable = Table {
|
|
name="prosodyarchive";
|
|
Column { name="sort_id", type="INTEGER", primary_key=true, auto_increment=true };
|
|
Column { name="host", type="TEXT", nullable=false };
|
|
Column { name="user", type="TEXT", nullable=false };
|
|
Column { name="store", type="TEXT", nullable=false };
|
|
Column { name="key", type="TEXT", nullable=false }; -- item id
|
|
Column { name="when", type="INTEGER", nullable=false }; -- timestamp
|
|
Column { name="with", type="TEXT", nullable=false }; -- related id
|
|
Column { name="type", type="TEXT", nullable=false };
|
|
Column { name="value", type="MEDIUMTEXT", nullable=false };
|
|
Index { name="prosodyarchive_index", unique = engine.params.driver ~= "MySQL", "host", "user", "store", "key" };
|
|
Index { name="prosodyarchive_with_when", "host", "user", "store", "with", "when" };
|
|
Index { name="prosodyarchive_when", "host", "user", "store", "when" };
|
|
Index { name="prosodyarchive_sort", "host", "user", "store", "sort_id" };
|
|
};
|
|
engine:transaction(function()
|
|
ProsodyArchiveTable:create(engine);
|
|
end);
|
|
end
|
|
|
|
local function upgrade_table(engine, params, apply_changes) -- luacheck: ignore 431/engine
|
|
local changes = false;
|
|
if params.driver == "MySQL" then
|
|
local sql = dbisql;
|
|
local success,err = engine:transaction(function()
|
|
do
|
|
local result = assert(engine:execute("SHOW COLUMNS FROM \"prosody\" WHERE \"Field\"='value' and \"Type\"='text'"));
|
|
if result:rowcount() > 0 then
|
|
changes = true;
|
|
if apply_changes then
|
|
module:log("info", "Upgrading database schema (value column size)...");
|
|
assert(engine:execute("ALTER TABLE \"prosody\" MODIFY COLUMN \"value\" MEDIUMTEXT"));
|
|
module:log("info", "Database table automatically upgraded");
|
|
end
|
|
end
|
|
end
|
|
|
|
do
|
|
-- Ensure index is not unique (issue #1073)
|
|
local result = assert(engine:execute([[SHOW INDEX FROM prosodyarchive WHERE key_name='prosodyarchive_index' and non_unique=0]]));
|
|
if result:rowcount() > 0 then
|
|
changes = true;
|
|
if apply_changes then
|
|
module:log("info", "Upgrading database schema (prosodyarchive_index)...");
|
|
assert(engine:execute[[ALTER TABLE "prosodyarchive" DROP INDEX prosodyarchive_index;]]);
|
|
local new_index = sql.Index { table = "prosodyarchive", name="prosodyarchive_index", "host", "user", "store", "key" };
|
|
assert(engine:_create_index(new_index));
|
|
module:log("info", "Database table automatically upgraded");
|
|
end
|
|
end
|
|
end
|
|
return true;
|
|
end);
|
|
if not success then
|
|
module:log("error", "Failed to check/upgrade database schema (%s), please see "
|
|
.."https://prosody.im/doc/mysql for help",
|
|
err or "unknown error");
|
|
return false;
|
|
end
|
|
|
|
-- COMPAT w/pre-0.10: Upgrade table to UTF-8 if not already
|
|
local check_encoding_query = [[
|
|
SELECT "COLUMN_NAME","COLUMN_TYPE","TABLE_NAME"
|
|
FROM "information_schema"."columns"
|
|
WHERE "TABLE_NAME" LIKE 'prosody%%'
|
|
AND "TABLE_SCHEMA" = ?
|
|
AND ( "CHARACTER_SET_NAME"!=? OR "COLLATION_NAME"!=?);
|
|
]];
|
|
-- FIXME Is it ok to ignore the return values from this?
|
|
engine:transaction(function()
|
|
local result = assert(engine:execute(check_encoding_query, params.database, engine.charset, engine.charset.."_bin"));
|
|
local n_bad_columns = result:rowcount();
|
|
if n_bad_columns > 0 then
|
|
changes = true;
|
|
if apply_changes then
|
|
module:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns);
|
|
local fix_column_query1 = "ALTER TABLE \"%s\" CHANGE \"%s\" \"%s\" BLOB;";
|
|
local fix_column_query2 = "ALTER TABLE \"%s\" CHANGE \"%s\" \"%s\" %s CHARACTER SET '%s' COLLATE '%s_bin';";
|
|
for row in result:rows() do
|
|
local column_name, column_type, table_name = unpack(row);
|
|
module:log("debug", "Fixing column %s in table %s", column_name, table_name);
|
|
engine:execute(fix_column_query1:format(table_name, column_name, column_name));
|
|
engine:execute(fix_column_query2:format(table_name, column_name, column_name, column_type, engine.charset, engine.charset));
|
|
end
|
|
module:log("info", "Database encoding upgrade complete!");
|
|
end
|
|
end
|
|
end);
|
|
success,err = engine:transaction(function()
|
|
return engine:execute(check_encoding_query, params.database,
|
|
engine.charset, engine.charset.."_bin");
|
|
end);
|
|
if not success then
|
|
module:log("error", "Failed to check/upgrade database encoding: %s", err or "unknown error");
|
|
return false;
|
|
end
|
|
else
|
|
local indices = {};
|
|
engine:transaction(function ()
|
|
if params.driver == "SQLite3" then
|
|
for row in engine:select [[SELECT "name" FROM "sqlite_schema" WHERE "type"='index' AND "tbl_name"='prosody' AND "name"='prosody_index';]] do
|
|
indices[row[1]] = true;
|
|
end
|
|
elseif params.driver == "PostgreSQL" then
|
|
for row in engine:select [[SELECT "indexname" FROM "pg_indexes" WHERE "tablename"='prosody' AND "indexname"='prosody_index';]] do
|
|
indices[row[1]] = true;
|
|
end
|
|
end
|
|
end)
|
|
if indices["prosody_index"] then
|
|
local success = engine:transaction(function ()
|
|
return assert(engine:execute([[DROP INDEX "prosody_index";]]));
|
|
end);
|
|
if not success then
|
|
module:log("error", "Failed to delete obsolete index \"prosody_index\"");
|
|
return false;
|
|
end
|
|
end
|
|
end
|
|
return changes;
|
|
end
|
|
|
|
local function normalize_database(driver, database) -- luacheck: ignore 431/driver
|
|
if driver == "SQLite3" and database ~= ":memory:" then
|
|
return resolve_relative_path(prosody.paths.data or ".", database or "prosody.sqlite");
|
|
end
|
|
return database;
|
|
end
|
|
|
|
local function normalize_params(params)
|
|
return {
|
|
driver = assert(params.driver,
|
|
"Configuration error: Both the SQL driver and the database need to be specified");
|
|
database = assert(normalize_database(params.driver, params.database),
|
|
"Configuration error: Both the SQL driver and the database need to be specified");
|
|
username = params.username;
|
|
password = params.password;
|
|
host = params.host;
|
|
port = params.port;
|
|
};
|
|
end
|
|
|
|
function module.load()
|
|
local engines = module:shared("/*/sql/connections");
|
|
local params = normalize_params(module:get_option("sql", default_params));
|
|
local sql = params.driver == "SQLite3" and sqlite or dbisql;
|
|
local db_uri = sql.db2uri(params);
|
|
engine = engines[db_uri];
|
|
if not engine then
|
|
module:log("debug", "Creating new engine %s", db_uri);
|
|
engine = sql:create_engine(params, function (engine) -- luacheck: ignore 431/engine
|
|
if module:get_option_boolean("sql_manage_tables", true) then
|
|
-- Automatically create table, ignore failure (table probably already exists)
|
|
-- FIXME: we should check in information_schema, etc.
|
|
create_table(engine);
|
|
-- Check whether the table needs upgrading
|
|
if upgrade_table(engine, params, false) then
|
|
module:log("error", "Old database format detected. Please run: prosodyctl mod_%s upgrade", module.name);
|
|
return false, "database upgrade needed";
|
|
end
|
|
end
|
|
if engine.params.driver == "SQLite3" then
|
|
local compile_options = {}
|
|
for row in engine:select("PRAGMA compile_options") do
|
|
local option = row[1]:lower();
|
|
local opt, val = option:match("^([^=]+)=(.*)$");
|
|
compile_options[opt or option] = tonumber(val) or val or true;
|
|
end
|
|
-- COMPAT Need to check SQLite3 version because SQLCipher 3.x was based on SQLite3 prior to 3.24.0 when UPSERT was introduced
|
|
for row in engine:select("SELECT sqlite_version()") do
|
|
local version = {};
|
|
for n in row[1]:gmatch("%d+") do
|
|
table.insert(version, tonumber(n));
|
|
end
|
|
engine.sqlite_version = version;
|
|
end
|
|
engine.sqlite_compile_options = compile_options;
|
|
|
|
local journal_mode = "delete";
|
|
for row in engine:select[[PRAGMA journal_mode;]] do
|
|
journal_mode = row[1];
|
|
end
|
|
|
|
-- Note: These things can't be changed with in a transaction. LuaDBI
|
|
-- opens a transaction automatically for every statement(?), so this
|
|
-- will not work there.
|
|
local tune = module:get_option_enum("sqlite_tune", "default", "normal", "fast", "safe");
|
|
if tune == "normal" then
|
|
if journal_mode ~= "wal" then
|
|
engine:execute("PRAGMA journal_mode=WAL;");
|
|
end
|
|
engine:execute("PRAGMA auto_vacuum=FULL;");
|
|
engine:execute("PRAGMA synchronous=NORMAL;")
|
|
elseif tune == "fast" then
|
|
if journal_mode ~= "wal" then
|
|
engine:execute("PRAGMA journal_mode=WAL;");
|
|
end
|
|
if compile_options.secure_delete then
|
|
engine:execute("PRAGMA secure_delete=FAST;");
|
|
end
|
|
engine:execute("PRAGMA synchronous=OFF;")
|
|
engine:execute("PRAGMA fullfsync=0;")
|
|
elseif tune == "safe" then
|
|
if journal_mode ~= "delete" then
|
|
engine:execute("PRAGMA journal_mode=DELETE;");
|
|
end
|
|
engine:execute("PRAGMA synchronous=EXTRA;")
|
|
engine:execute("PRAGMA fullfsync=1;")
|
|
end
|
|
|
|
for row in engine:select[[PRAGMA journal_mode;]] do
|
|
journal_mode = row[1];
|
|
end
|
|
|
|
module:log("debug", "SQLite3 database %q operating with journal_mode=%s", engine.params.database, journal_mode);
|
|
end
|
|
module:set_status("info", "Connected to " .. engine.params.driver);
|
|
end, function (engine) -- luacheck: ignore 431/engine
|
|
module:set_status("error", "Disconnected from " .. engine.params.driver);
|
|
end);
|
|
engines[sql.db2uri(params)] = engine;
|
|
else
|
|
module:set_status("info", "Using existing engine");
|
|
end
|
|
|
|
module:provides("storage", driver);
|
|
end
|
|
|
|
function module.command(arg)
|
|
local config = require "prosody.core.configmanager";
|
|
local hi = require "prosody.util.human.io";
|
|
local command = table.remove(arg, 1);
|
|
if command == "upgrade" then
|
|
-- We need to find every unique dburi in the config
|
|
local uris = {};
|
|
for host in pairs(prosody.hosts) do -- luacheck: ignore 431/host
|
|
local params = normalize_params(config.get(host, "sql") or default_params);
|
|
local sql = engine.params.driver == "SQLite3" and sqlite or dbisql;
|
|
uris[sql.db2uri(params)] = params;
|
|
end
|
|
print("We will check and upgrade the following databases:\n");
|
|
for _, params in pairs(uris) do
|
|
print("", "["..params.driver.."] "..params.database..(params.host and " on "..params.host or ""));
|
|
end
|
|
print("");
|
|
print("Ensure you have working backups of the above databases before continuing! ");
|
|
if false == hi.show_yesno("Continue with the database upgrade? [yN]") then
|
|
print("Ok, no upgrade. But you do have backups, don't you? ...don't you?? :-)");
|
|
return;
|
|
end
|
|
-- Upgrade each one
|
|
for _, params in pairs(uris) do
|
|
print("Checking "..params.database.."...");
|
|
local sql = params.driver == "SQLite3" and sqlite or dbisql;
|
|
engine = sql:create_engine(params);
|
|
upgrade_table(engine, params, true);
|
|
end
|
|
print("All done!");
|
|
elseif command then
|
|
print("Unknown command: "..command);
|
|
else
|
|
print("Available commands:");
|
|
print("","upgrade - Perform database upgrade");
|
|
end
|
|
end
|