From 1fe8eac554011fb6a1f6e350fa0b83e3735454e7 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Fri, 16 Jan 2015 17:10:58 +0300 Subject: [PATCH] Stop replication on apply error. --- src/box/error.cc | 3 +-- src/box/replica.cc | 48 +++++++++++++++++++--------------------------- 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/src/box/error.cc b/src/box/error.cc index 5b92e59b04..8f9bde9877 100644 --- a/src/box/error.cc +++ b/src/box/error.cc @@ -54,8 +54,7 @@ ClientError::ClientError(const char *file, unsigned line, const char *msg, void ClientError::log() const { - _say(S_ERROR, m_file, m_line, NULL, "%s: %s", tnt_errcode_str(m_errcode), - m_errmsg); + _say(S_ERROR, m_file, m_line, m_errmsg, "%s", tnt_errcode_str(m_errcode)); } diff --git a/src/box/replica.cc b/src/box/replica.cc index 867b5df592..7c17abf141 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -220,20 +220,23 @@ pull_from_remote(va_list ap) { struct recovery_state *r = va_arg(ap, struct recovery_state *); struct ev_io coio; - struct iobuf *iobuf = NULL; + struct iobuf *iobuf = iobuf_new(fiber_name(fiber())); ev_loop *loop = loop(); coio_init(&coio); - for (;;) { + auto coio_guard = make_scoped_guard([&] { + iobuf_delete(iobuf); + evio_close(loop(), &coio); + }); + + while (true) { const char *err = NULL; try { struct xrow_header row; fiber_setcancellable(true); if (! evio_is_active(&coio)) { remote_set_status(&r->remote, "connecting"); - if (iobuf == NULL) - iobuf = iobuf_new(fiber_name(fiber())); err = "can't connect to master"; remote_connect(r, &coio, iobuf); /* Send SUBSCRIBE request */ @@ -241,42 +244,35 @@ pull_from_remote(va_list ap) xrow_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock); remote_write_row(&coio, &row); - remote_read_row(&coio, iobuf, &row); - if (iproto_type_is_error(row.type)) { - try { - xrow_decode_error(&row); - } catch (ClientError *e) { - e->log(); - panic("replication subscribe " - "failed"); - } - } r->remote.warning_said = false; remote_set_status(&r->remote, "connected"); } err = "can't read row"; + /** + * If there is an error in subscribe, it's + * sent directly in response to subscribe. + * If subscribe is successful, there is no + * "OK" response, but a stream of rows. + * from the binary log. + */ remote_read_row(&coio, iobuf, &row); - if (iproto_type_is_error(row.type)) - xrow_decode_error(&row); /* error */ fiber_setcancellable(false); err = NULL; - r->remote.recovery_lag = ev_now(loop) - row.tm; r->remote.recovery_last_update_tstamp = ev_now(loop); - /** - * XXX: deal with apply conflict, - * it's ignored now. - */ + if (iproto_type_is_error(row.type)) + xrow_decode_error(&row); /* error */ recovery_process(r, &row); iobuf_reset(iobuf); fiber_gc(); + } catch (ClientError *e) { + remote_set_status(&r->remote, "stopped"); + throw; } catch (FiberCancelException *e) { remote_set_status(&r->remote, "failed"); - iobuf_delete(iobuf); - evio_close(loop, &coio); throw; } catch (Exception *e) { remote_set_status(&r->remote, "failed"); @@ -322,11 +318,7 @@ recovery_follow_remote(struct recovery_state *r) say_crit("starting replication from %s", uri); snprintf(name, sizeof(name), "replica/%s", uri); - try { - f = fiber_new(name, pull_from_remote); - } catch (Exception *e) { - return; - } + f = fiber_new(name, pull_from_remote); r->remote.reader = f; fiber_call(f, r); -- GitLab