diff --git a/src/box/replica.cc b/src/box/replica.cc index b9c6a7be293d7e5053451a34d135a9f30da979f1..b4699a317c302613829eae3543d94502bb2fd812 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -43,6 +43,8 @@ #include "session.h" #include "box/cluster.h" +static const int RECONNECT_DELAY = 1.0; + static void remote_read_row(struct ev_io *coio, struct iobuf *iobuf, struct iproto_header *row) @@ -128,7 +130,26 @@ replica_bootstrap(struct recovery_state *r) evio_close(loop(), &coio); }); - remote_connect(r, &coio, iobuf); + for (;;) { + try { + remote_connect(r, &coio, iobuf); + r->remote.warning_said = false; + break; + } catch (FiberCancelException *e) { + throw; + } catch (Exception *e) { + if (! r->remote.warning_said) { + say_error("can't connect to master"); + e->log(); + say_info("will retry every %i second", + RECONNECT_DELAY); + r->remote.warning_said = true; + } + iobuf_reset(iobuf); + evio_close(loop(), &coio); + } + fiber_sleep(RECONNECT_DELAY); + } /* Send JOIN request */ struct iproto_header row; @@ -177,8 +198,6 @@ pull_from_remote(va_list ap) struct recovery_state *r = va_arg(ap, struct recovery_state *); struct ev_io coio; struct iobuf *iobuf = NULL; - bool warning_said = false; - const int reconnect_delay = 1; ev_loop *loop = loop(); /** This fiber executes transactions. */ SessionGuard session_guard(-1, 0); @@ -201,7 +220,7 @@ pull_from_remote(va_list ap) iproto_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock); remote_write_row(&coio, &row); - warning_said = false; + r->remote.warning_said = false; remote_set_status(&r->remote, "connected"); } err = "can't read row"; @@ -226,12 +245,13 @@ pull_from_remote(va_list ap) throw; } catch (Exception *e) { remote_set_status(&r->remote, "failed"); - e->log(); - if (! warning_said) { + if (! r->remote.warning_said) { if (err != NULL) say_info("%s", err); - say_info("will retry every %i second", reconnect_delay); - warning_said = true; + e->log(); + say_info("will retry every %i second", + RECONNECT_DELAY); + r->remote.warning_said = true; } evio_close(loop, &coio); } @@ -250,7 +270,7 @@ pull_from_remote(va_list ap) * See: https://github.com/tarantool/tarantool/issues/136 */ if (! evio_is_active(&coio)) - fiber_sleep(reconnect_delay); + fiber_sleep(RECONNECT_DELAY); } } diff --git a/src/box/replica.h b/src/box/replica.h index bf6dbeb0117a1754d285eeca074b1b8ff3552e84..0f79c512ec4720afef23d8f095bf3c56c24d8c80 100644 --- a/src/box/replica.h +++ b/src/box/replica.h @@ -39,6 +39,7 @@ struct remote { struct uri uri; struct fiber *reader; ev_tstamp recovery_lag, recovery_last_update_tstamp; + bool warning_said; char source[REMOTE_SOURCE_MAXLEN]; }; diff --git a/test/lib/tarantool_server.py b/test/lib/tarantool_server.py index b47d48dffa54ca79757db1a4094b4e97d54d2991..605140d79026367c249f1c9f92e50c77db9b7dd0 100644 --- a/test/lib/tarantool_server.py +++ b/test/lib/tarantool_server.py @@ -423,10 +423,10 @@ class TarantoolServer(Server): self._admin = find_port(port) self._sql = find_port(port + 1) - def deploy(self, silent=True): + def deploy(self, silent=True, wait = True): self.install(silent) if not self._start_and_exit: - self.start(silent) + self.start(silent=silent, wait=wait) else: self.start_and_exit() @@ -451,7 +451,7 @@ class TarantoolServer(Server): self.start() self.process.wait() - def start(self, silent=True): + def start(self, silent=True, wait = True): if self.status == 'started': if not silent: color_stdout('The server is already started.\n', schema='lerror') @@ -474,7 +474,8 @@ class TarantoolServer(Server): cwd = self.vardir, stdout=self.log_des, stderr=self.log_des) - self.wait_until_started() + if wait: + self.wait_until_started() self.status = 'started' def wait_stop(self): diff --git a/test/replication/init_storage.result b/test/replication/init_storage.result index f2561c71ff9140d1c4bb6a24baa265f53a47f0d1..4b4e01d1b8939e0d54e06c5615da008c6aee96e9 100644 --- a/test/replication/init_storage.result +++ b/test/replication/init_storage.result @@ -106,3 +106,10 @@ space:get{19} --- - [19, 6859] ... +------------------------------------------------------------- +reconnect on JOIN/SUBSCRIBE +------------------------------------------------------------- +waiting reconnect on JOIN... +ok +waiting reconnect on SUBSCRIBE... +ok diff --git a/test/replication/init_storage.test.py b/test/replication/init_storage.test.py index e7446c3a67c9e160dba897bc4c08a63af0111adf..27b8b9699f3dae09ef026be25b3baa64d71043c5 100644 --- a/test/replication/init_storage.test.py +++ b/test/replication/init_storage.test.py @@ -52,6 +52,33 @@ for i in range(1, 20): replica.stop() replica.cleanup(True) +print '-------------------------------------------------------------' +print 'reconnect on JOIN/SUBSCRIBE' +print '-------------------------------------------------------------' + server.stop() -server.deploy() +replica = TarantoolServer(server.ini) +replica.script = 'replication/replica.lua' +replica.vardir = os.path.join(server.vardir, 'replica') +replica.rpl_master = master +replica.deploy(wait=False) + +print 'waiting reconnect on JOIN...' +server.start() +replica.wait_until_started() +print 'ok' +replica.stop() +server.stop() + +print 'waiting reconnect on SUBSCRIBE...' +replica.start(wait=False) +server.start() +replica.wait_until_started() +print 'ok' + +replica.stop() +replica.cleanup(True) + +server.stop() +server.deploy()