diff --git a/README.md b/README.md index 013a03c..b08e42e 100644 --- a/README.md +++ b/README.md @@ -637,6 +637,7 @@ If there are no 'ready' tasks in the queue, returns nil. * `tube:bury(task_id)` - buries a task * `tube:kick(count)` - digs out `count` tasks * `tube:peek(task_id)` - return the task state by ID +* `tube:tasks_by_state(task_state)` - return the iterator to tasks in a certain state * `tube:truncate()` - delete all tasks from the tube. Note that `tube:truncate` must be called only by the user who created this tube (has space ownership) OR under a `setuid` function. Read more about `setuid` functions diff --git a/queue/abstract.lua b/queue/abstract.lua index 74c97d6..5866ae7 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -63,6 +63,26 @@ queue.driver = { limfifottl = require('queue.abstract.driver.limfifottl') } +local function tube_release_all_tasks(tube) + local prefix = ('queue: [tube "%s"] '):format(tube.name) + + -- We lean on stable iterators in this function. + -- https://github.com/tarantool/tarantool/issues/1796 + if not qc.check_version({1, 7, 5}) then + log.error(prefix .. 'no stable iterator support: skip task releasing') + log.error(prefix .. 'some tasks may stuck in taken state perpetually') + log.error(prefix .. 'update tarantool to >= 1.7.5 or take the risk') + end + + log.info(prefix .. 'releasing all taken task (may take a while)') + local released = 0 + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + tube.raw:release(task[1]) + released = released + 1 + end + log.info(prefix .. ('released %d tasks'):format(released)) +end + -- tube methods local tube = {} @@ -515,7 +535,11 @@ function method.start() }) end - _queue:pairs():each(recreate_tube) + for _, tube_tuple in _queue:pairs() do + local tube = recreate_tube(tube_tuple) + -- gh-66: release all taken tasks on start + tube_release_all_tasks(tube) + end session.on_disconnect(queue._on_consumer_disconnect) return queue diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 2440cb9..8b4e954 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -121,6 +121,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index bacfa85..7e095e8 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -348,6 +348,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index bca41b7..3a42c64 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -151,6 +151,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 83ea5b7..02451dc 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -370,6 +370,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/t/000-init.t b/t/000-init.t index 537e38e..d988671 100755 --- a/t/000-init.t +++ b/t/000-init.t @@ -43,5 +43,5 @@ test:test('access to queue after box.cfg{}', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/001-tube-init.t b/t/001-tube-init.t index 77de030..9f03c42 100755 --- a/t/001-tube-init.t +++ b/t/001-tube-init.t @@ -23,5 +23,5 @@ test:test('test queue mock addition', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/010-fifo.t b/t/010-fifo.t index 300b892..834176b 100755 --- a/t/010-fifo.t +++ b/t/010-fifo.t @@ -3,7 +3,7 @@ local yaml = require('yaml') local fiber = require('fiber') local test = require('tap').test() -test:plan(14) +test:plan(15) local queue = require('queue') local state = require('queue.abstract.state') @@ -292,6 +292,36 @@ test:test('if_not_exists test', function(test) test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists") end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'fifo') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i)) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) + tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/020-fifottl.t b/t/020-fifottl.t index dfeceaf..4a94769 100755 --- a/t/020-fifottl.t +++ b/t/020-fifottl.t @@ -2,7 +2,7 @@ local fiber = require('fiber') local test = require('tap').test() -test:plan(15) +test:plan(16) local queue = require('queue') local state = require('queue.abstract.state') @@ -262,7 +262,36 @@ test:test('buried task in a dropped queue', function(test) test:ok(true, 'queue does not hang') end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'fifottl') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i)) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/030-utube.t b/t/030-utube.t index 384db5c..ffd9213 100755 --- a/t/030-utube.t +++ b/t/030-utube.t @@ -3,7 +3,7 @@ local yaml = require('yaml') local fiber = require('fiber') local test = (require('tap')).test() -test:plan(11) +test:plan(12) local queue = require('queue') local state = require('queue.abstract.state') @@ -156,6 +156,36 @@ test:test('if_not_exists test', function(test) test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists") end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'utube') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i), { utube = i }) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) + tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/040-utubettl.t b/t/040-utubettl.t index a9d37f1..5af2d9f 100755 --- a/t/040-utubettl.t +++ b/t/040-utubettl.t @@ -3,7 +3,7 @@ local yaml = require('yaml') local fiber = require('fiber') local test = (require('tap')).test() -test:plan(16) +test:plan(17) local queue = require('queue') local state = require('queue.abstract.state') @@ -257,6 +257,36 @@ test:test('ttl after delay test', function(test) test:is(task.ttr, TTR * 1000000, 'check TTR after release') end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'utubettl') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i), { utube = i }) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) + tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/050-ttl.t b/t/050-ttl.t index ece3d1b..4f76fd7 100755 --- a/t/050-ttl.t +++ b/t/050-ttl.t @@ -58,5 +58,5 @@ test:test('many messages, one queue utttl', function (test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/060-async.t b/t/060-async.t index 37f7ca3..8737501 100755 --- a/t/060-async.t +++ b/t/060-async.t @@ -54,5 +54,5 @@ end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua: diff --git a/t/070-compat.t b/t/070-compat.t index 7b3e345..a78b5dd 100755 --- a/t/070-compat.t +++ b/t/070-compat.t @@ -42,5 +42,5 @@ test:test("check compatibility names", function(test) test:is(str_name("1.7.1-168"), "str", "check old name (str)") end) -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua: diff --git a/t/080-otc-cb.t b/t/080-otc-cb.t index fcd8b95..7a6b5d7 100755 --- a/t/080-otc-cb.t +++ b/t/080-otc-cb.t @@ -45,5 +45,5 @@ test:test('on_task_change callback', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/090-grant-check.t b/t/090-grant-check.t index 084ddcf..110b64e 100755 --- a/t/090-grant-check.t +++ b/t/090-grant-check.t @@ -166,5 +166,5 @@ end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/100-limfifottl.t b/t/100-limfifottl.t index 32aee3d..e78b064 100755 --- a/t/100-limfifottl.t +++ b/t/100-limfifottl.t @@ -68,5 +68,5 @@ else end tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua :