From c88fdeafedc504d54d8519c7eb43cc63cd297c35 Mon Sep 17 00:00:00 2001 From: Pavel Balaev Date: Mon, 6 Mar 2023 19:24:55 +0300 Subject: [PATCH] replicaset_mode: fix queue.tube indexes In replicaset mode, tubes can be created and deleted on different nodes. Accordingly, it is necessary to rebuild the queue.tube index. Closes #202 --- queue/abstract.lua | 45 ++++++++++++++++++++++------------- t/200-master-replica.t | 53 +++++++++++++++++++++++++++++++++++++++++- t/tnt/init.lua | 2 +- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/queue/abstract.lua b/queue/abstract.lua index 8b443cf4..e928ba43 100644 --- a/queue/abstract.lua +++ b/queue/abstract.lua @@ -520,10 +520,39 @@ function method._on_consumer_disconnect() session.disconnect(conn_id) end +-- function takes tuples and recreates tube +local function recreate_tube(tube_tuple) + local name, id, space_name, tube_type, opts = tube_tuple:unpack() + + local driver = queue.driver[tube_type] + if driver == nil then + error("Unknown tube type " .. tostring(tube_type)) + end + + local space = box.space[space_name] + if space == nil then + error(("Space '%s' doesn't exists"):format(space_name)) + end + return make_self(driver, space, name, tube_type, id, opts) +end + -- Function takes new queue state. -- The "RUNNING" and "WAITING" states do not require additional actions. local function on_state_change(state) if state == queue_state.states.STARTUP then + local replicaset_mode = queue.cfg['in_replicaset'] or false + -- gh-202: In replicaset mode, tubes can be created and deleted on different nodes. + -- Accordingly, it is necessary to rebuild the queue.tube index. + if replicaset_mode then + for _, tube_name in pairs(queue.tube()) do + queue.tube[tube_name] = nil + end + for _, tube_tuple in box.space._queue:pairs() do + if queue.driver[tube_tuple[4]] ~= nil then + recreate_tube(tube_tuple) + end + end + end for name, tube in pairs(queue.tube) do tube_release_all_orphaned_tasks(tube) log.info('queue: [tube "%s"] start driver', name) @@ -551,22 +580,6 @@ local function on_state_change(state) end end --- function takes tuples and recreates tube -local function recreate_tube(tube_tuple) - local name, id, space_name, tube_type, opts = tube_tuple:unpack() - - local driver = queue.driver[tube_type] - if driver == nil then - error("Unknown tube type " .. tostring(tube_type)) - end - - local space = box.space[space_name] - if space == nil then - error(("Space '%s' doesn't exists"):format(space_name)) - end - return make_self(driver, space, name, tube_type, id, opts) -end - ------------------------------------------------------------------------------- -- create tube function method.create_tube(tube_name, tube_type, opts) diff --git a/t/200-master-replica.t b/t/200-master-replica.t index acbb7213..5c078948 100755 --- a/t/200-master-replica.t +++ b/t/200-master-replica.t @@ -21,7 +21,7 @@ end -- Replica connection handler. local conn = {} -test:plan(7) +test:plan(8) test:test('Check master-replica setup', function(test) test:plan(8) @@ -337,6 +337,57 @@ test:test('Check in_replicaset switching', function(test) 'taken tasks count after release_all') end) +-- gh-202 +test:test('Check that tubes indexes is actual after role change', function(test) + local engine = os.getenv('ENGINE') or 'memtx' + test:plan(10) + box.cfg{read_only = true} + queue_state.poll(queue_state.states.WAITING, 10) + test:is(queue.state(), 'WAITING', 'master state is waiting') + conn:eval('box.cfg{read_only=false}') + conn:eval([[ + queue_state = require('queue.abstract.queue_state') + queue_state.poll(queue_state.states.RUNNING, 10) + ]]) + test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running') + conn:eval([[queue.create_tube('repl_tube', 'fifo', {engine =]] .. engine .. [[})]]) + + -- Switch roles back. + conn:eval('box.cfg{read_only=true}') + conn:eval([[ + queue_state = require('queue.abstract.queue_state') + queue_state.poll(queue_state.states.WAITING, 10) + ]]) + box.cfg{read_only = false} + queue_state.poll(queue_state.states.RUNNING, 10) + test:is(queue.state(), 'RUNNING', 'master state is running') + test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting') + test:ok(queue.tube.repl_tube, 'repl_tube is accessible') + + box.cfg{read_only = true} + queue_state.poll(queue_state.states.WAITING, 10) + test:is(queue.state(), 'WAITING', 'master state is waiting') + conn:eval('box.cfg{read_only=false}') + conn:eval([[ + queue_state = require('queue.abstract.queue_state') + queue_state.poll(queue_state.states.RUNNING, 10) + ]]) + test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running') + conn:eval('queue.tube.repl_tube:drop()') + + -- Switch roles back. + conn:eval('box.cfg{read_only=true}') + conn:eval([[ + queue_state = require('queue.abstract.queue_state') + queue_state.poll(queue_state.states.WAITING, 10) + ]]) + box.cfg{read_only = false} + queue_state.poll(queue_state.states.RUNNING, 10) + test:is(queue.state(), 'RUNNING', 'master state is running') + test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting') + test:isnil(queue.tube.repl_tube, "repl_tube is not indexed") +end) + rawset(_G, 'queue', nil) conn:eval('rawset(_G, "queue", nil)') conn:close() diff --git a/t/tnt/init.lua b/t/tnt/init.lua index 1d488e3e..ed85ef19 100644 --- a/t/tnt/init.lua +++ b/t/tnt/init.lua @@ -87,7 +87,7 @@ local function tnt_cluster_prepare(cfg_args) box.cfg(cfg_args) -- Allow guest all operations. - box.schema.user.grant('guest', 'read, write, execute', 'universe') + box.schema.user.grant('guest', 'read, write, execute, create, drop', 'universe') box.schema.user.create('replicator', {password = 'password'}) box.schema.user.grant('replicator', 'replication')