Skip to content
Snippets Groups Projects
Commit 1fe8eac5 authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

Stop replication on apply error.

parent 73211306
No related branches found
No related tags found
No related merge requests found
......@@ -54,8 +54,7 @@ ClientError::ClientError(const char *file, unsigned line, const char *msg,
void
ClientError::log() const
{
_say(S_ERROR, m_file, m_line, NULL, "%s: %s", tnt_errcode_str(m_errcode),
m_errmsg);
_say(S_ERROR, m_file, m_line, m_errmsg, "%s", tnt_errcode_str(m_errcode));
}
......
......@@ -220,20 +220,23 @@ pull_from_remote(va_list ap)
{
struct recovery_state *r = va_arg(ap, struct recovery_state *);
struct ev_io coio;
struct iobuf *iobuf = NULL;
struct iobuf *iobuf = iobuf_new(fiber_name(fiber()));
ev_loop *loop = loop();
coio_init(&coio);
for (;;) {
auto coio_guard = make_scoped_guard([&] {
iobuf_delete(iobuf);
evio_close(loop(), &coio);
});
while (true) {
const char *err = NULL;
try {
struct xrow_header row;
fiber_setcancellable(true);
if (! evio_is_active(&coio)) {
remote_set_status(&r->remote, "connecting");
if (iobuf == NULL)
iobuf = iobuf_new(fiber_name(fiber()));
err = "can't connect to master";
remote_connect(r, &coio, iobuf);
/* Send SUBSCRIBE request */
......@@ -241,42 +244,35 @@ pull_from_remote(va_list ap)
xrow_encode_subscribe(&row, &cluster_id,
&r->server_uuid, &r->vclock);
remote_write_row(&coio, &row);
remote_read_row(&coio, iobuf, &row);
if (iproto_type_is_error(row.type)) {
try {
xrow_decode_error(&row);
} catch (ClientError *e) {
e->log();
panic("replication subscribe "
"failed");
}
}
r->remote.warning_said = false;
remote_set_status(&r->remote, "connected");
}
err = "can't read row";
/**
* If there is an error in subscribe, it's
* sent directly in response to subscribe.
* If subscribe is successful, there is no
* "OK" response, but a stream of rows.
* from the binary log.
*/
remote_read_row(&coio, iobuf, &row);
if (iproto_type_is_error(row.type))
xrow_decode_error(&row); /* error */
fiber_setcancellable(false);
err = NULL;
r->remote.recovery_lag = ev_now(loop) - row.tm;
r->remote.recovery_last_update_tstamp =
ev_now(loop);
/**
* XXX: deal with apply conflict,
* it's ignored now.
*/
if (iproto_type_is_error(row.type))
xrow_decode_error(&row); /* error */
recovery_process(r, &row);
iobuf_reset(iobuf);
fiber_gc();
} catch (ClientError *e) {
remote_set_status(&r->remote, "stopped");
throw;
} catch (FiberCancelException *e) {
remote_set_status(&r->remote, "failed");
iobuf_delete(iobuf);
evio_close(loop, &coio);
throw;
} catch (Exception *e) {
remote_set_status(&r->remote, "failed");
......@@ -322,11 +318,7 @@ recovery_follow_remote(struct recovery_state *r)
say_crit("starting replication from %s", uri);
snprintf(name, sizeof(name), "replica/%s", uri);
try {
f = fiber_new(name, pull_from_remote);
} catch (Exception *e) {
return;
}
f = fiber_new(name, pull_from_remote);
r->remote.reader = f;
fiber_call(f, r);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment