diff --git a/src/box/applier.h b/src/box/applier.h index f1f7ad6188114989f75ac5b6b0aeb497ae5163ad..a6863798bfc652a89fbe9426c82ca251f8308c99 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -67,6 +67,7 @@ struct applier { enum applier_state state; ev_tstamp lag, last_row_time; bool warning_said; + bool cfg_merge_flag; /* used by box_set_replication_source */ uint32_t id; char source[APPLIER_SOURCE_MAXLEN]; rb_node(struct applier) link; /* a set by source in cluster.cc */ diff --git a/src/box/box.cc b/src/box/box.cc index ac5944346b7a7ca69ecf62682ccb9344558e169c..6256663de517b39913079640ff0813b4aafbb93e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -183,7 +183,11 @@ box_check_uri(const char *source, const char *option_name) static void box_check_replication_source(void) { - box_check_uri(cfg_gets("replication_source"), "replication_source"); + int count = cfg_getarr_size("replication_source"); + for (int i = 0; i < count; i++) { + const char *source = cfg_getarr_elem("replication_source", i); + box_check_uri(source, "replication_source"); + } } static enum wal_mode @@ -228,6 +232,44 @@ box_check_config() box_check_wal_mode(cfg_gets("wal_mode")); } +/* + * Sync box.cfg.replication_source and cluster registry. + */ +static void +box_sync_replication_source(void) +{ + /* Reset cfg_merge_flag for all replicas */ + cluster_foreach_applier(applier) { + applier->cfg_merge_flag = false; + } + + /* Add new replicas and set cfg_merge_flag for existing */ + int count = cfg_getarr_size("replication_source"); + for (int i = 0; i < count; i++) { + const char *source = cfg_getarr_elem("replication_source", i); + /* Try to find applier with the same source in the registry */ + struct applier *applier = cluster_find_applier(source); + if (applier == NULL) { + /* Start new applier using specified source */ + applier = applier_new(source); /* may throw */ + cluster_add_applier(applier); + } + applier->cfg_merge_flag = true; + } + + /* Remove replicas without cfg_merge_flag */ + struct applier *applier = cluster_applier_first(); + while (applier != NULL) { + struct applier *next = cluster_applier_next(applier); + if (!applier->cfg_merge_flag) { + applier_stop(applier); /* cancels a background fiber */ + cluster_del_applier(applier); + applier_delete(applier); + } + applier = next; + } +} + extern "C" void box_set_replication_source(void) { @@ -240,25 +282,19 @@ box_set_replication_source(void) return; } - const char *source = cfg_gets("replication_source"); + box_sync_replication_source(); - /* This hook is only invoked if source has changed */ - struct applier *applier = cluster_applier_first(); - while (applier != NULL) { - struct applier *next = cluster_applier_next(applier); - applier_stop(applier); /* cancels a background fiber */ - cluster_del_applier(applier); - applier_delete(applier); - applier = next; /* safe iteration with cluster_del_applier */ + /* Start all replicas from the cluster registry */ + cluster_foreach_applier(applier) { + if (applier->reader == NULL) { + applier_start(applier, recovery); + } else if (applier->state == APPLIER_OFF || + applier->state == APPLIER_STOPPED) { + /* Re-start faulted replicas */ + applier_stop(applier); + applier_start(applier, recovery); + } } - - if (source == NULL) - return; - - /* Start a new replication client using provided URI */ - applier = applier_new(source); - cluster_add_applier(applier); - applier_start(applier, recovery); /* starts a background fiber */ } extern "C" void @@ -269,6 +305,29 @@ box_set_listen(void) iproto_set_listen(uri); } +/** + * Check if (host, port) in box.cfg.listen is equal to (host, port) in uri. + * Used to determine that an uri from box.cfg.replication_source is + * actually points to the same address as box.cfg.listen. + */ +static bool +box_cfg_listen_eq(struct uri *what) +{ + const char *listen = cfg_gets("listen"); + if (listen == NULL) + return false; + + struct uri uri; + int rc = uri_parse(&uri, listen); + assert(rc == 0 && uri.service); + (void) rc; + + return (uri.service_len == what->service_len && + uri.host_len == what->host_len && + memcmp(uri.service, what->service, uri.service_len) == 0 && + memcmp(uri.host, what->host, uri.host_len) == 0); +} + extern "C" void box_set_wal_mode(void) { @@ -737,18 +796,21 @@ box_init(void) recovery_setup_panic(recovery, cfg_geti("panic_on_snap_error"), cfg_geti("panic_on_wal_error")); - const char *source = cfg_gets("replication_source"); - if (source != NULL) { - struct applier *applier = applier_new(source); - cluster_add_applier(applier); - } + + /* + * Initialize the cluster registry using replication_source, + * but don't start replication right now. + */ + box_sync_replication_source(); + /* Use the first replica by URI as a bootstrap leader */ + struct applier *applier = cluster_applier_first(); if (recovery_has_data(recovery)) { /* Tell Sophia engine LSN it must recover to. */ int64_t checkpoint_id = recovery_last_checkpoint(recovery); engine_recover_to_checkpoint(checkpoint_id); - } else if (source != NULL) { + } else if (applier != NULL && !box_cfg_listen_eq(&applier->uri)) { /* Generate Server-UUID */ tt_uuid_create(&recovery->server_uuid); @@ -759,7 +821,6 @@ box_init(void) vclock_add_server(&recovery->vclock, 0); /* Bootstrap from the first master */ - struct applier *applier = cluster_applier_first(); applier_start(applier, recovery); applier_wait(applier); /* throws on failure */ @@ -797,11 +858,8 @@ box_init(void) rmean_cleanup(rmean_box); - cluster_foreach_applier(applier) { - /* Follow replica */ - assert(recovery->writer); - applier_start(applier, recovery); - } + /* Follow replica */ + box_set_replication_source(); /* Enter read-write mode. */ if (recovery->server_id > 0) diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 5668675253f2839bf60518cfb9f53c673ba998e6..0fc959501ae6d3052effe5013c6073798ebdd60c 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -99,7 +99,7 @@ local template_cfg = { wal_dir_rescan_delay= 'number', panic_on_snap_error = 'boolean', panic_on_wal_error = 'boolean', - replication_source = 'string, number', + replication_source = 'string, number, table', custom_proc_title = 'string', pid_file = 'string', background = 'boolean', diff --git a/src/cfg.cc b/src/cfg.cc index b4926abd22c3c1fd76ef9d9bbb70d446c801e336..0e7f5b22e4c997fbf7a0bd1dae01abfa6a52784e 100644 --- a/src/cfg.cc +++ b/src/cfg.cc @@ -91,7 +91,16 @@ int cfg_getarr_size(const char *name) { cfg_get(name); - luaL_checktype(tarantool_L, -1, LUA_TTABLE); + if (lua_isnil(tarantool_L, -1)) { + /* missing value is equal to empty array */ + lua_pop(tarantool_L, 1); + return 0; + } else if (!lua_istable(tarantool_L, -1)) { + /* scalars are handled like an array with one element */ + lua_pop(tarantool_L, 1); + return 1; + } + int result = luaL_getn(tarantool_L, -1); lua_pop(tarantool_L, 1); return result; @@ -101,10 +110,16 @@ const char * cfg_getarr_elem(const char *name, int i) { cfg_get(name); - luaL_checktype(tarantool_L, -1, LUA_TTABLE); + if (!lua_istable(tarantool_L, -1)) { + /* scalars are handled like an array with one element */ + assert(i == 0 && !lua_isnil(tarantool_L, -1)); + const char *val = cfg_tostring(tarantool_L); + lua_pop(tarantool_L, 1); + return val; + } + lua_rawgeti(tarantool_L, -1, i + 1); const char *val = cfg_tostring(tarantool_L); lua_pop(tarantool_L, 2); return val; } - diff --git a/test/replication/multi.result b/test/replication/multi.result new file mode 100644 index 0000000000000000000000000000000000000000..6b392325d21d8bb7fb45986d0ee853342c4be5ac --- /dev/null +++ b/test/replication/multi.result @@ -0,0 +1,81 @@ +fiber = require('fiber') +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +box.schema.user.grant('guest', 'execute', 'universe') +--- +... +---------------------------------------------------------------------- +Bootstrap replicas +---------------------------------------------------------------------- +done +---------------------------------------------------------------------- +Make a full mesh +---------------------------------------------------------------------- +server 1 connected +server 1 connected +server 1 connected +box.info.vclock +--- +- - 4 + - 0 + - 0 +... +server 2 connected +server 2 connected +server 2 connected +box.info.vclock +--- +- - 4 + - 0 + - 0 +... +server 3 connected +server 3 connected +server 3 connected +box.info.vclock +--- +- - 4 + - 0 + - 0 +... +done +---------------------------------------------------------------------- +Test inserts +---------------------------------------------------------------------- +Create a test space +_ = box.schema.space.create('test') +--- +... +_ = box.space.test:create_index('primary') +--- +... +server 1 is ok +server 2 is ok +server 3 is ok + +Insert records +inserted 60 records + +Synchronize +server 3 done +server 3 done +server 3 done +done + +Check data +server 1 is ok +server 2 is ok +server 3 is ok +Done + + +---------------------------------------------------------------------- +Cleanup +---------------------------------------------------------------------- +server 1 done +server 2 done +server 3 done + diff --git a/test/replication/multi.test.py b/test/replication/multi.test.py new file mode 100644 index 0000000000000000000000000000000000000000..6a564d9967b626d95feb8445f68a426b99568c97 --- /dev/null +++ b/test/replication/multi.test.py @@ -0,0 +1,111 @@ +import sys +import os +from lib.tarantool_server import TarantoolServer +import yaml + +REPLICA_N = 3 +ROW_N = REPLICA_N * 20 + +## + +# master server +master = server +master.admin("fiber = require('fiber')") +master.admin("box.schema.user.grant('guest', 'replication')") +master.admin("box.schema.user.grant('guest', 'execute', 'universe')") + +print '----------------------------------------------------------------------' +print 'Bootstrap replicas' +print '----------------------------------------------------------------------' + +# Start replicas +master.id = master.get_param('server')['id'] +master_lsn = master.get_lsn(master.id) +cluster = [ master ] +for i in range(REPLICA_N - 1): + server = TarantoolServer(server.ini) + server.script = 'replication/replica.lua' + server.vardir = os.path.join(server.vardir, 'replica', str(master.id + i)) + server.rpl_master = master + server.deploy() + # Wait replica to fully bootstrap. + # Otherwise can get ACCESS_DENIED error. + server.wait_lsn(master.id, master_lsn) + cluster.append(server) + +# Make a list of servers +sources = [] +for server in cluster: + sources.append(yaml.load(server.admin('box.cfg.listen', silent = True))[0]) + server.id = server.get_param('server')['id'] + +print 'done' + +print '----------------------------------------------------------------------' +print 'Make a full mesh' +print '----------------------------------------------------------------------' + +# Connect each server to each other to make full mesh +for server in cluster: + server.iproto.py_con.eval("box.cfg { replication_source = ... }", [sources]) + +# Wait connections to establish +for server in cluster: + for server2 in cluster: + server.iproto.py_con.eval(""" + while #box.info.vclock[...] ~= nil do + fiber.sleep(0.01) + end;""", server2.id) + print 'server', server.id, "connected" + server.admin("box.info.vclock") + +print 'done' + +print '----------------------------------------------------------------------' +print 'Test inserts' +print '----------------------------------------------------------------------' + +print 'Create a test space' +master.admin("_ = box.schema.space.create('test')") +master.admin("_ = box.space.test:create_index('primary')") +master_lsn = master.get_lsn(master.id) +# Wait changes to propagate to replicas +for server in cluster: + server.wait_lsn(master.id, master_lsn) + print 'server', server.id, 'is ok' +print + +print 'Insert records' +for i in range(ROW_N): + server = cluster[i % REPLICA_N] + server.admin("box.space.test:insert{%d, %s}" % (i, server.id), silent = True) +print 'inserted %d records' % ROW_N +print + +print 'Synchronize' +for server1 in cluster: + for server2 in cluster: + server1.wait_lsn(server2.id, server2.get_lsn(server2.id)) + print 'server', server.id, 'done' +print 'done' +print + +print 'Check data' +for server in cluster: + cnt = yaml.load(server.admin("box.space.test:len()", silent = True))[0] + print 'server', server.id, 'is', cnt == ROW_N and 'ok' or 'not ok' +print 'Done' +print + +print +print '----------------------------------------------------------------------' +print 'Cleanup' +print '----------------------------------------------------------------------' + +for server in cluster: + server.stop() + print 'server', server.id, 'done' +print + +master.cleanup() +master.deploy()