diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 44d28a1725ee6ea6bfc87ea32194d42b672e015b..e46b8b87e3fe55d0d181ded8bbe2504ed511238c 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -731,6 +731,15 @@ lbox_info_synchro(struct lua_State *L) lua_setfield(L, -2, "busy"); luaL_pushuint64(L, queue->promote_greatest_term); lua_setfield(L, -2, "term"); + if (queue->len == 0) { + lua_pushnumber(L, 0); + } else { + struct txn_limbo_entry *oldest_entry = + txn_limbo_first_entry(queue); + double now = fiber_clock(); + lua_pushnumber(L, now - oldest_entry->insertion_time); + } + lua_setfield(L, -2, "age"); lua_setfield(L, -2, "queue"); return 1; diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 00618bb94424dbef7b34077cc452634c0ba041a0..f182bf4e83b8f7813c72dc17782548d4114e810e 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -170,6 +170,7 @@ txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) e->ack_count = 0; e->is_commit = false; e->is_rollback = false; + e->insertion_time = fiber_clock(); rlist_add_tail_entry(&limbo->queue, e, in_queue); limbo->len++; return e; diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index ce93c9d10c100476addf324a474d81ae07423085..307e8b7a38ad5a07e07a2d060be12c5470575691 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -69,6 +69,8 @@ struct txn_limbo_entry { */ bool is_commit; bool is_rollback; + /** When this entry was added to the queue. */ + double insertion_time; }; static inline bool diff --git a/test/replication-luatest/gh_9918_synchro_queue_additional_info_test.lua b/test/replication-luatest/gh_9918_synchro_queue_additional_info_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..5bc73cd8ae04f7094761282104971db3432bb905 --- /dev/null +++ b/test/replication-luatest/gh_9918_synchro_queue_additional_info_test.lua @@ -0,0 +1,112 @@ +local fiber = require('fiber') +local t = require('luatest') +local replica_set = require('luatest.replica_set') +local server = require('luatest.server') + +local g = t.group() + +g.before_each(function(cg) + cg.replica_set = replica_set:new{} + local box_cfg = { + replication = { + server.build_listen_uri('server1', cg.replica_set.id), + server.build_listen_uri('server2', cg.replica_set.id), + }, + replication_timeout = 0.1, + replication_synchro_timeout = 60, + } + cg.leader = cg.replica_set:build_and_add_server{ + alias = 'server1', + box_cfg = box_cfg, + } + cg.replica = cg.replica_set:build_and_add_server{ + alias = 'server2', + box_cfg = box_cfg, + } + cg.replica_set:start() + cg.leader:exec(function() + box.ctl.promote() + local s = box.schema.space.create('s', {is_sync = true}) + s:create_index('pk') + end) + cg.replica_set:wait_for_fullmesh() + cg.leader:wait_for_downstream_to(cg.replica) +end) + +g.after_each(function(cg) + cg.replica_set:drop() +end) + +-- Test that new `age` field of `box.info.synchro.queue` works correctly. +g.test_age_field = function(cg) + -- The synchronous queue is originally empty. + cg.leader:exec(function() + t.assert_equals(box.info.synchro.queue.age, 0) + end) + cg.replica:exec(function() + t.assert_equals(box.info.synchro.queue.age, 0) + end) + + -- Ensure that the synchronous transactions stay in the queue for a while. + cg.leader:update_box_cfg{replication_synchro_quorum = 3} + -- Add the first entry to the synchronous queue. + local fid1 = cg.leader:exec(function() + local fiber = require('fiber') + + local f = fiber.new(function() box.space.s:replace{0} end) + f:set_joinable(true) + return f:id() + end) + local wait_time = 0.1 + fiber.sleep(wait_time) + + local leader_age = cg.leader:exec(function(wait_time) + t.assert_ge(box.info.synchro.queue.age, wait_time) + t.assert_le(box.info.synchro.queue.age, + box.cfg.replication_synchro_timeout) + return box.info.synchro.queue.age + end, {wait_time}) + local replica_age = cg.replica:exec(function() + t.assert_ge(box.info.synchro.queue.age, 0) + t.assert_le(box.info.synchro.queue.age, + box.cfg.replication_synchro_timeout) + return box.info.synchro.queue.age + end) + + -- Add another entry to the synchronous queue. + local fid2 = cg.leader:exec(function() + local f = require('fiber').new(function() box.space.s:replace{0} end) + f:set_joinable(true) + return f:id() + end) + fiber.sleep(wait_time) + + -- The age of the oldest synchronous queue entry must be shown. + cg.leader:exec(function(age) + t.assert_ge(box.info.synchro.queue.age, age) + t.assert_le(box.info.synchro.queue.age, + box.cfg.replication_synchro_timeout) + end, {leader_age}) + cg.replica:exec(function(age) + t.assert_ge(box.info.synchro.queue.age, age) + t.assert_le(box.info.synchro.queue.age, + box.cfg.replication_synchro_timeout) + end, {replica_age}) + + -- Allow the synchronous queue to advance. + cg.leader:update_box_cfg{replication_synchro_quorum = ''} + cg.leader:exec(function(fids) + for _, fid in ipairs(fids) do + t.assert(require('fiber').find(fid):join()) + end + end, {{fid1, fid2}}) + cg.leader:wait_for_downstream_to(cg.replica) + + -- The synchronous queue must become empty by this time. + cg.leader:exec(function() + t.assert_equals(box.info.synchro.queue.age, 0) + end) + cg.replica:exec(function() + t.assert_equals(box.info.synchro.queue.age, 0) + end) +end