mirror of
https://github.com/bjc/prosody.git
synced 2025-04-05 14:17:37 +03:00
util.async: Add some more comments for clarity
This commit is contained in:
parent
0ea8db9430
commit
e34fadb697
1 changed files with 15 additions and 0 deletions
|
@ -7,6 +7,8 @@ local function runner_continue(thread)
|
|||
end
|
||||
local ok, state, runner = coroutine.resume(thread);
|
||||
if not ok then
|
||||
-- Running the coroutine failed, which means we have to find the runner manually,
|
||||
-- in order to inform the error handler
|
||||
local level = 0;
|
||||
while debug.getinfo(thread, level, "") do level = level + 1; end
|
||||
ok, runner = debug.getlocal(thread, level-1, 1);
|
||||
|
@ -99,37 +101,48 @@ local function runner(func, watchers, data)
|
|||
, runner_mt);
|
||||
end
|
||||
|
||||
-- Add a task item for the runner to process
|
||||
function runner_mt:run(input)
|
||||
if input ~= nil then
|
||||
table.insert(self.queue, input);
|
||||
end
|
||||
if self.state ~= "ready" then
|
||||
-- The runner is busy. Indicate that the task item has been
|
||||
-- queued, and return information about the current runner state
|
||||
return true, self.state, #self.queue;
|
||||
end
|
||||
|
||||
local q, thread = self.queue, self.thread;
|
||||
if not thread or coroutine.status(thread) == "dead" then
|
||||
-- Create a new coroutine for this runner
|
||||
thread = runner_create_thread(self.func, self);
|
||||
self.thread = thread;
|
||||
end
|
||||
|
||||
-- Process task item(s) while the queue is not empty, and we're not blocked
|
||||
local n, state, err = #q, self.state, nil;
|
||||
self.state = "running";
|
||||
while n > 0 and state == "ready" do
|
||||
local consumed;
|
||||
-- Loop through queue items, and attempt to run them
|
||||
for i = 1,n do
|
||||
local input = q[i];
|
||||
local ok, new_state = coroutine.resume(thread, input);
|
||||
if not ok then
|
||||
-- There was an error running the coroutine, save the error, mark runner as ready to begin again
|
||||
consumed, state, err = i, "ready", debug.traceback(thread, new_state);
|
||||
self.thread = nil;
|
||||
break;
|
||||
elseif new_state == "wait" then
|
||||
-- Runner is blocked on waiting for a task item to complete
|
||||
consumed, state = i, "waiting";
|
||||
break;
|
||||
end
|
||||
end
|
||||
-- Loop ended - either queue empty because all tasks passed without blocking (consumed == nil)
|
||||
-- or runner is blocked/errored, and consumed will contain the number of tasks processed so far
|
||||
if not consumed then consumed = n; end
|
||||
-- Remove consumed items from the queue array
|
||||
if q[n+1] ~= nil then
|
||||
n = #q;
|
||||
end
|
||||
|
@ -138,6 +151,7 @@ function runner_mt:run(input)
|
|||
end
|
||||
n = #q;
|
||||
end
|
||||
-- Runner processed all items it can, so save current runner state
|
||||
self.state = state;
|
||||
if err or state ~= self.notified_state then
|
||||
if err then
|
||||
|
@ -151,6 +165,7 @@ function runner_mt:run(input)
|
|||
return true, state, n;
|
||||
end
|
||||
|
||||
-- Add a task item to the queue without invoking the runner, even if it is idle
|
||||
function runner_mt:enqueue(input)
|
||||
table.insert(self.queue, input);
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue