diff --git a/queue/abstract.lua b/queue/abstract.lua index 74c97d62..c8da1f59 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -73,6 +73,7 @@ function tube.put(self, data, opts) end local conds = {} +local releasing_sessions = {} function tube.take(self, timeout) timeout = time(timeout or TIMEOUT_INFINITY) @@ -94,6 +95,13 @@ function tube.take(self, timeout) conds[fid]:free() box.space._queue_consumers:delete{ sid, fid } + -- We don't take a task if the session is in a + -- disconnecting state. + if releasing_sessions[fid] then + releasing_sessions[fid] = nil + return nil + end + task = self.raw:take() if task ~= nil then @@ -356,6 +364,7 @@ function method._on_consumer_disconnect() box.space._queue_consumers:delete{ waiter[1], waiter[2] } local cond = conds[waiter[2]] if cond then + releasing_sessions[waiter[2]] = true cond:signal(waiter[2]) end end diff --git a/t/120-take-task-after-reconnect.t b/t/120-take-task-after-reconnect.t new file mode 100755 index 00000000..f080108a --- /dev/null +++ b/t/120-take-task-after-reconnect.t @@ -0,0 +1,62 @@ +#!/usr/bin/env tarantool + +local fiber = require('fiber') +local netbox = require('net.box') +local os = require('os') +local queue = require('queue') +local tap = require('tap') +local tnt = require('t.tnt') + + +local test = tap.test('take a task after reconnect') +test:plan(1) + +local listen = 'localhost:1918' +tnt.cfg{listen = listen} + + +local function test_take_task_after_disconnect(test) + test:plan(1) + local driver = 'fifottl' + local tube = queue.create_tube('test_tube', driver, + {if_not_exists = true}) + rawset(_G, 'queue', require('queue')) + tube:grant('guest', {call = true}) + local task_id = tube:put('test_data')[1] + -- Now we have one task in a ready state + + local connection = netbox.connect(listen) + local fiber_1 = fiber.create(function() + connection:call('queue.tube.test_tube:take') + connection:call('queue.tube.test_tube:take') + end) + + -- This is not a best practice but we need to use the fiber.sleep() + -- (not fiber.yield()). + -- Expected results from a sleep() calling: + -- 1) Execute first connection:call('queue.tube.test_tube:take') + -- Now one task in a taken state + -- 2) Call the second connection:call('queue.tube.test_tube:take') + -- and to hang the fiber_1 + -- 3) Start a fiber on the server side of connection which will execute + -- second queue.tube.test_tube:take call and hang because the queue + -- is empty + fiber.sleep(0.1) + + connection:close() + + fiber.sleep(0.1) + -- The taken task will be released (cause - disconnection). + -- After that the fiber which waiting of a ready task (into take procedure) + -- will try to take this task (before the fix). + + + test:is(tube:peek(task_id)[2] == 'r', true, 'Task in ready state') +end + + +test:test('Don\'t take a task after disconnect', test_take_task_after_disconnect) + + +tnt.finish() +os.exit(test:check() and 0 or 1)