diff --git a/xqueue.lua b/xqueue.lua index 3fa1ca1..c7d45a2 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -219,6 +219,57 @@ local function _table2tuple ( qformat ) return dostring(fun) end +local function run_worker(space, xq, worker_num) + local worker = { + space = space; + xq = xq; + num = worker_num; + } + worker.fname = space.name .. '.xq.wrk' .. tostring(worker_num) + if package.reload then + worker.generation = package.reload.count + worker.fname = worker.fname .. '.' .. package.reload.count + end + xq.workers.registered[worker.num] = worker + worker.fiber = fiber.create(function(space, xq, worker_num, worker) + fiber.name(string.sub(worker.fname,1,32)) + repeat fiber.sleep(0.001) until space.xq + if worker.xq.ready then xq.ready:get() end + log.info("I am worker %s", worker_num) + + while box.space[space.name] and space.xq == xq and xq.workers.registered[worker.num] do + if xq.workers.handler then + local task = space:take(1) + if task then + local key = xq:getkey(task) + local r,e = pcall(xq.workers.handler, task) + + if not r then + log.error("Worker for {%s} has error: %s", key, e) + else + if xq.taken[ key ] then + space:ack(task) + end + end + + if xq.taken[ key ] then + log.error("Worker for {%s} not released task", key) + space:release(task) + end + end + end + fiber.sleep(1) + end + log.info("worker %s ended", worker_num) + end, space, xq, worker_num, worker) + + return setmetatable(worker, { + __serialize = function(self) + return ("[worker#%s] for %s"):format(self.num, self.space.name) + end; + }) +end + local methods = {} function M.upgrade(space,opts,depth) @@ -594,44 +645,20 @@ function M.upgrade(space,opts,depth) end if opts.worker then - local workers = opts.workers or 1 - local worker = opts.worker - for i = 1,workers do - fiber.create(function(space,xq,i) - local fname = space.name .. '.xq.wrk' .. tostring(i) - if package.reload then fname = fname .. '.' .. package.reload.count end - fiber.name(string.sub(fname,1,32)) - repeat fiber.sleep(0.001) until space.xq - if xq.ready then xq.ready:get() end - log.info("I am worker %s",i) - while box.space[space.name] and space.xq == xq do - local task = space:take(1) - if task then - local key = xq:getkey(task) - local r,e = pcall(worker,task) - if not r then - log.error("Worker for {%s} has error: %s", key, e) - else - if xq.taken[ key ] then - space:ack(task) - end - end - if xq.taken[ key ] then - log.error("Worker for {%s} not released task", key) - space:release(task) - end - end - fiber.sleep(1) - end - log.info("worker %s ended", i) - end,space,self,i) + self.workers = { + count = opts.workers or 1; + handler = opts.worker; + registered = {}; + } + for i = 1, self.workers.count do + run_worker(space, self, i) end end if have_runat then self.runat_chan = fiber.channel(0) self.runat = fiber.create(function(space,xq,runat_index) - local fname = space.name .. '.xq' + local fname = space.name .. '.xq.runat' if package.reload then fname = fname .. '.' .. package.reload.count end fiber.name(string.sub(fname,1,32)) repeat fiber.sleep(0.001) until space.xq @@ -1315,6 +1342,57 @@ function methods:stats(pretty) return stats end +function methods:set_worker(handler) + local xq = self.xq + if not xq.workers then + xq.workers = { + count = 1; + handler = handler; + registered = {}; + } + else + xq.workers.handler = handler + end + + for i = 1, xq.workers.count do + if not xq.workers.registered[i] then + run_worker(self, xq, i) + end + end + + return self +end + +function methods:set_workers_cnt(cnt) + assert(type(cnt) == 'number', "worker count must be a number") + + local xq = self.xq + local old_cnt = xq.workers and xq.workers.count or 0 + if not xq.workers then + xq.workers = { + handler = nil; + count = cnt; + registered = {}; + } + else + xq.workers.count = cnt + end + + if old_cnt > xq.workers.count then + for i = xq.workers.count + 1, old_cnt do + xq.workers.registered[i] = nil + end + elseif old_cnt < xq.workers.count then + for i = old_cnt, xq.workers.count do + if i ~= 0 then + run_worker(self, xq, i) + end + end + end + + return self +end + setmetatable(M,{ __call = function(M, space, opts) M.upgrade(space,opts,1)