From 0a0242080cb56d1f8f9f0214f31b85abdf473304 Mon Sep 17 00:00:00 2001 From: Leonid Vasiliev Date: Wed, 18 Dec 2019 11:22:45 +0300 Subject: [PATCH 1/2] Release all taken tasks on start All taken tasks will be released after the server restart If some tasks have been taken and don't released before shutdown of the tarantool instance (for example: tarantool instance has been killed) such task go to 'hung' state (noone can take the task now). So, we must release all taken tasks on start of the queue module. Fixes #66 --- README.md | 1 + queue/abstract.lua | 26 +++++++++++++++++++++++- queue/abstract/driver/fifo.lua | 5 +++++ queue/abstract/driver/fifottl.lua | 5 +++++ queue/abstract/driver/utube.lua | 5 +++++ queue/abstract/driver/utubettl.lua | 5 +++++ t/010-fifo.t | 32 +++++++++++++++++++++++++++++- t/020-fifottl.t | 31 ++++++++++++++++++++++++++++- t/030-utube.t | 32 +++++++++++++++++++++++++++++- t/040-utubettl.t | 32 +++++++++++++++++++++++++++++- 10 files changed, 169 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 013a03cd..b08e42e6 100644 --- a/README.md +++ b/README.md @@ -637,6 +637,7 @@ If there are no 'ready' tasks in the queue, returns nil. * `tube:bury(task_id)` - buries a task * `tube:kick(count)` - digs out `count` tasks * `tube:peek(task_id)` - return the task state by ID +* `tube:tasks_by_state(task_state)` - return the iterator to tasks in a certain state * `tube:truncate()` - delete all tasks from the tube. Note that `tube:truncate` must be called only by the user who created this tube (has space ownership) OR under a `setuid` function. Read more about `setuid` functions diff --git a/queue/abstract.lua b/queue/abstract.lua index 74c97d62..5866ae71 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -63,6 +63,26 @@ queue.driver = { limfifottl = require('queue.abstract.driver.limfifottl') } +local function tube_release_all_tasks(tube) + local prefix = ('queue: [tube "%s"] '):format(tube.name) + + -- We lean on stable iterators in this function. + -- https://github.com/tarantool/tarantool/issues/1796 + if not qc.check_version({1, 7, 5}) then + log.error(prefix .. 'no stable iterator support: skip task releasing') + log.error(prefix .. 'some tasks may stuck in taken state perpetually') + log.error(prefix .. 'update tarantool to >= 1.7.5 or take the risk') + end + + log.info(prefix .. 'releasing all taken task (may take a while)') + local released = 0 + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + tube.raw:release(task[1]) + released = released + 1 + end + log.info(prefix .. ('released %d tasks'):format(released)) +end + -- tube methods local tube = {} @@ -515,7 +535,11 @@ function method.start() }) end - _queue:pairs():each(recreate_tube) + for _, tube_tuple in _queue:pairs() do + local tube = recreate_tube(tube_tuple) + -- gh-66: release all taken tasks on start + tube_release_all_tasks(tube) + end session.on_disconnect(queue._on_consumer_disconnect) return queue diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 2440cb94..8b4e9544 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -121,6 +121,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index bacfa854..7e095e8e 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -348,6 +348,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index bca41b79..3a42c64e 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -151,6 +151,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 83ea5b7c..02451dc6 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -370,6 +370,11 @@ function method.peek(self, id) return self.space:get{id} end +-- get iterator to tasks in a certain state +function method.tasks_by_state(self, task_state) + return self.space.index.status:pairs(task_state) +end + function method.truncate(self) self.space:truncate() end diff --git a/t/010-fifo.t b/t/010-fifo.t index 300b892f..f0d90f89 100755 --- a/t/010-fifo.t +++ b/t/010-fifo.t @@ -3,7 +3,7 @@ local yaml = require('yaml') local fiber = require('fiber') local test = require('tap').test() -test:plan(14) +test:plan(15) local queue = require('queue') local state = require('queue.abstract.state') @@ -292,6 +292,36 @@ test:test('if_not_exists test', function(test) test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists") end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'fifo') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i)) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) + tnt.finish() os.exit(test:check() == true and 0 or -1) -- vim: set ft=lua : diff --git a/t/020-fifottl.t b/t/020-fifottl.t index dfeceafc..bebe6e00 100755 --- a/t/020-fifottl.t +++ b/t/020-fifottl.t @@ -2,7 +2,7 @@ local fiber = require('fiber') local test = require('tap').test() -test:plan(15) +test:plan(16) local queue = require('queue') local state = require('queue.abstract.state') @@ -262,6 +262,35 @@ test:test('buried task in a dropped queue', function(test) test:ok(true, 'queue does not hang') end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'fifottl') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i)) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) tnt.finish() os.exit(test:check() == true and 0 or -1) diff --git a/t/030-utube.t b/t/030-utube.t index 384db5ce..4b1802a2 100755 --- a/t/030-utube.t +++ b/t/030-utube.t @@ -3,7 +3,7 @@ local yaml = require('yaml') local fiber = require('fiber') local test = (require('tap')).test() -test:plan(11) +test:plan(12) local queue = require('queue') local state = require('queue.abstract.state') @@ -156,6 +156,36 @@ test:test('if_not_exists test', function(test) test:isnt(tube, tube_new, "if_not_exists if tube doesn't exists") end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'utube') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i), { utube = i }) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) + tnt.finish() os.exit(test:check() == true and 0 or -1) -- vim: set ft=lua : diff --git a/t/040-utubettl.t b/t/040-utubettl.t index a9d37f10..774e1657 100755 --- a/t/040-utubettl.t +++ b/t/040-utubettl.t @@ -3,7 +3,7 @@ local yaml = require('yaml') local fiber = require('fiber') local test = (require('tap')).test() -test:plan(16) +test:plan(17) local queue = require('queue') local state = require('queue.abstract.state') @@ -257,6 +257,36 @@ test:test('ttl after delay test', function(test) test:is(task.ttr, TTR * 1000000, 'check TTR after release') end) +test:test('Get tasks by state test', function(test) + test:plan(2) + local tube = queue.create_tube('test_task_it', 'utubettl') + + for i = 1, 10 do + tube:put('test_data' .. tostring(i), { utube = i }) + end + for i = 1, 4 do + tube:take(0.001) + end + + local count_taken = 0 + local count_ready = 0 + + for _, task in tube.raw:tasks_by_state(state.READY) do + if task[2] == state.READY then + count_ready = count_ready + 1 + end + end + + for _, task in tube.raw:tasks_by_state(state.TAKEN) do + if task[2] == state.TAKEN then + count_taken = count_taken + 1 + end + end + + test:is(count_ready, 6, 'Check tasks count in a ready state') + test:is(count_taken, 4, 'Check tasks count in a taken state') +end) + tnt.finish() os.exit(test:check() == true and 0 or -1) -- vim: set ft=lua : From 471cc030ca8ed23dce6a00f2c878fae106a57245 Mon Sep 17 00:00:00 2001 From: Leonid Vasiliev Date: Thu, 27 Feb 2020 18:40:07 +0300 Subject: [PATCH 2/2] test: replace the tests exit code in case of error from 255 to 1 --- t/000-init.t | 2 +- t/001-tube-init.t | 2 +- t/010-fifo.t | 2 +- t/020-fifottl.t | 2 +- t/030-utube.t | 2 +- t/040-utubettl.t | 2 +- t/050-ttl.t | 2 +- t/060-async.t | 2 +- t/070-compat.t | 2 +- t/080-otc-cb.t | 2 +- t/090-grant-check.t | 2 +- t/100-limfifottl.t | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/t/000-init.t b/t/000-init.t index 537e38e6..d988671a 100755 --- a/t/000-init.t +++ b/t/000-init.t @@ -43,5 +43,5 @@ test:test('access to queue after box.cfg{}', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/001-tube-init.t b/t/001-tube-init.t index 77de0309..9f03c421 100755 --- a/t/001-tube-init.t +++ b/t/001-tube-init.t @@ -23,5 +23,5 @@ test:test('test queue mock addition', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/010-fifo.t b/t/010-fifo.t index f0d90f89..834176be 100755 --- a/t/010-fifo.t +++ b/t/010-fifo.t @@ -323,5 +323,5 @@ test:test('Get tasks by state test', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/020-fifottl.t b/t/020-fifottl.t index bebe6e00..4a94769e 100755 --- a/t/020-fifottl.t +++ b/t/020-fifottl.t @@ -293,5 +293,5 @@ test:test('Get tasks by state test', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/030-utube.t b/t/030-utube.t index 4b1802a2..ffd92136 100755 --- a/t/030-utube.t +++ b/t/030-utube.t @@ -187,5 +187,5 @@ test:test('Get tasks by state test', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/040-utubettl.t b/t/040-utubettl.t index 774e1657..5af2d9f1 100755 --- a/t/040-utubettl.t +++ b/t/040-utubettl.t @@ -288,5 +288,5 @@ test:test('Get tasks by state test', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/050-ttl.t b/t/050-ttl.t index ece3d1be..4f76fd7e 100755 --- a/t/050-ttl.t +++ b/t/050-ttl.t @@ -58,5 +58,5 @@ test:test('many messages, one queue utttl', function (test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/060-async.t b/t/060-async.t index 37f7ca38..8737501a 100755 --- a/t/060-async.t +++ b/t/060-async.t @@ -54,5 +54,5 @@ end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua: diff --git a/t/070-compat.t b/t/070-compat.t index 7b3e3456..a78b5dd0 100755 --- a/t/070-compat.t +++ b/t/070-compat.t @@ -42,5 +42,5 @@ test:test("check compatibility names", function(test) test:is(str_name("1.7.1-168"), "str", "check old name (str)") end) -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua: diff --git a/t/080-otc-cb.t b/t/080-otc-cb.t index fcd8b958..7a6b5d70 100755 --- a/t/080-otc-cb.t +++ b/t/080-otc-cb.t @@ -45,5 +45,5 @@ test:test('on_task_change callback', function(test) end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/090-grant-check.t b/t/090-grant-check.t index 084ddcfa..110b64e5 100755 --- a/t/090-grant-check.t +++ b/t/090-grant-check.t @@ -166,5 +166,5 @@ end) tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua : diff --git a/t/100-limfifottl.t b/t/100-limfifottl.t index 32aee3d9..e78b0649 100755 --- a/t/100-limfifottl.t +++ b/t/100-limfifottl.t @@ -68,5 +68,5 @@ else end tnt.finish() -os.exit(test:check() == true and 0 or -1) +os.exit(test:check() and 0 or 1) -- vim: set ft=lua :