diff --git a/src/box/recovery.cc b/src/box/recovery.cc index cc718dd92f265c9c96abebd639bc6e3a29169899..1a431db157fc7fe93fe7670e6048869ca5d06a52 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -48,6 +48,7 @@ #include "box/cluster.h" #include "vclock.h" #include "session.h" +#include "coio.h" /* * Recovery subsystem @@ -150,10 +151,7 @@ void wal_writer_stop(struct recovery_state *r); static void -wal_watcher_stop(struct recovery_state *r); - -static void -wal_watcher_init(struct wal_watcher *watcher); +recovery_stop_local(struct recovery_state *r); /** * Throws an exception in case of error. @@ -200,7 +198,7 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, */ xdir_check(&r->wal_dir); - wal_watcher_init(&r->watcher); + r->watcher = NULL; recovery_init_remote(r); guard.is_active = false; @@ -240,7 +238,7 @@ recovery_close_log(struct recovery_state *r) void recovery_delete(struct recovery_state *r) { - wal_watcher_stop(r); + recovery_stop_local(r); if (r->writer) wal_writer_stop(r); @@ -382,10 +380,9 @@ recover_snap(struct recovery_state *r) * This function will not close r->current_wal if * recovery was successful. */ -static int +static void recover_remaining_wals(struct recovery_state *r) { - int result = 0; struct xlog *next_wal; int64_t current_signature, last_signature; struct vclock *current_vclock; @@ -397,7 +394,7 @@ recover_remaining_wals(struct recovery_state *r) current_vclock = vclockset_last(&r->wal_dir.index); last_signature = current_vclock != NULL ? vclock_signature(current_vclock) : -1; - /* if the caller already opened WAL for us, recover from it first */ + /* If the caller already opened WAL for us, recover from it first */ if (r->current_wal != NULL) { if (r->signature == -1) { r->signature @@ -455,7 +452,6 @@ recover_remaining_wals(struct recovery_state *r) &r->wal_dir, current_signature, INPROGRESS)) != 0) panic("can't unlink 'inprogres' WAL"); } - result = 0; break; } assert(r->current_wal == NULL); @@ -465,14 +461,9 @@ recover_remaining_wals(struct recovery_state *r) recover_current_wal: rows_before = r->current_wal->rows; - try { - result = recover_xlog(r, r->current_wal); - } catch (Exception *e) { - say_error("failure reading from %s", - r->current_wal->filename); - e->log(); - break; - } + + int result = recover_xlog(r, r->current_wal); + if (r->current_wal->rows > 0 && r->current_wal->rows != rows_before) { @@ -516,29 +507,24 @@ recover_remaining_wals(struct recovery_state *r) * we lose some logs it is a fatal error. */ if (last_signature > r->signature) { - say_error("not all WALs have been successfully read"); - result = -1; + tnt_raise(XlogError, + "not all WALs have been successfully read"); } region_free(&fiber()->gc); - return result; } void recovery_finalize(struct recovery_state *r, int rows_per_wal) { - int result; - wal_watcher_stop(r); + recovery_stop_local(r); r->finalize = true; - result = recover_remaining_wals(r); - - if (result < 0) - panic("unable to successfully finalize recovery"); + recover_remaining_wals(r); - if (r->current_wal != NULL && result != LOG_EOF) { + if (r->current_wal != NULL) { say_warn("WAL `%s' wasn't correctly closed", r->current_wal->filename); if (!r->current_wal->is_inprogress) { @@ -558,9 +544,10 @@ recovery_finalize(struct recovery_state *r, int rows_per_wal) if (xlog_rename(r->current_wal) != 0) panic("can't rename 'inprogress' WAL '%s'", r->current_wal->filename); - } else + } else { panic("too many rows in 'inprogress' WAL '%s'", r->current_wal->filename); + } recovery_close_log(r); } @@ -573,114 +560,53 @@ recovery_finalize(struct recovery_state *r, int rows_per_wal) /* {{{ Local recovery: support of hot standby and replication relay */ -static void recovery_rescan_file(ev_loop *, ev_stat *w, int /* revents */); - -static void -recovery_watch_file(ev_loop *loop, struct wal_watcher *watcher, - struct xlog *wal) -{ - strncpy(watcher->filename, wal->filename, PATH_MAX); - ev_stat_init(&watcher->stat, recovery_rescan_file, - watcher->filename, 0.); - ev_stat_start(loop, &watcher->stat); -} - -static void -recovery_stop_file(struct wal_watcher *watcher) -{ - ev_stat_stop(loop(), &watcher->stat); -} - static void -recovery_rescan_dir(ev_loop * loop, ev_timer *w, int /* revents */) +recovery_follow_f(va_list ap) { - struct recovery_state *r = (struct recovery_state *) w->data; - struct wal_watcher *watcher = &r->watcher; - struct xlog *save_current_wal = r->current_wal; - - /** - * local hot standby is running from an ev - * watcher, without fiber infrastructure (todo: fix), - * but to run queries we need at least a current - * user. - */ + struct recovery_state *r = va_arg(ap, struct recovery_state *); + ev_tstamp wal_dir_rescan_delay = va_arg(ap, ev_tstamp); fiber_set_user(fiber(), &admin_credentials); - int result; - try { - result = recover_remaining_wals(r); - } catch (Exception *e) { - e->log(); - result = -1; - } - fiber_set_user(fiber(), NULL); - if (result < 0) - panic("recover failed: %i", result); - if (save_current_wal != r->current_wal) { - if (save_current_wal != NULL) - recovery_stop_file(watcher); - if (r->current_wal != NULL) - recovery_watch_file(loop, watcher, r->current_wal); - } -} -static void -recovery_rescan_file(ev_loop * loop, ev_stat *w, int /* revents */) -{ - struct recovery_state *r = (struct recovery_state *) w->data; - struct wal_watcher *watcher = &r->watcher; - fiber_set_user(fiber(), &admin_credentials); - try { - if (recover_xlog(r, r->current_wal) == LOG_EOF) { - say_info("done `%s'", r->current_wal->filename); - recovery_close_log(r); - recovery_stop_file(watcher); - /* Don't wait for wal_dir_rescan_delay. */ - recovery_rescan_dir(loop, &watcher->dir_timer, 0); + while (true) { + recover_remaining_wals(r); + /** + * Allow an immediate wakeup/break loop + * from recovery_stop_local(). + */ + fiber_set_cancellable(true); + if (r->current_wal != NULL) { + ev_stat stat; + coio_stat_init(&stat, r->current_wal->filename); + coio_stat_stat_timeout(&stat, wal_dir_rescan_delay); + } else { + fiber_yield_timeout(wal_dir_rescan_delay); } - } catch (Exception *e) { - e->log(); - panic("recover failed"); + if (fiber_is_cancelled()) + return; + fiber_set_cancellable(false); } - fiber_set_user(fiber(), NULL); -} - -static void -wal_watcher_init(struct wal_watcher *watcher) -{ - watcher->filename[0] = '\0'; - ev_init(&watcher->dir_timer, recovery_rescan_dir); - ev_init(&watcher->stat, recovery_rescan_file); } - void -recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay) +recovery_follow_local(struct recovery_state *r, + ev_tstamp wal_dir_rescan_delay) { assert(r->writer == NULL); - ev_loop *loop = loop(); - - struct wal_watcher *watcher = &r->watcher; - - ev_timer_init(&watcher->dir_timer, recovery_rescan_dir, - wal_dir_rescan_delay, wal_dir_rescan_delay); - watcher->dir_timer.data = watcher->stat.data = r; - ev_timer_start(loop, &watcher->dir_timer); - /* - * recover() leaves the current wal open if it has no - * EOF marker. - */ - if (r->current_wal != NULL) - recovery_watch_file(loop, watcher, r->current_wal); + assert(r->watcher == NULL); + r->watcher = fiber_new(fiber_name(fiber()), recovery_follow_f); + fiber_set_joinable(r->watcher, true); + fiber_start(r->watcher, r, wal_dir_rescan_delay); } static void -wal_watcher_stop(struct recovery_state *r) +recovery_stop_local(struct recovery_state *r) { - struct wal_watcher *watcher = &r->watcher; - if (ev_is_active(&watcher->dir_timer)) - ev_timer_stop(loop(), &watcher->dir_timer); - if (ev_is_active(&watcher->stat)) - ev_stat_stop(loop(), &watcher->stat); + if (r->watcher) { + struct fiber *f = r->watcher; + r->watcher = NULL; + fiber_cancel(f); + fiber_join(f); + } } /* }}} */ diff --git a/src/box/recovery.h b/src/box/recovery.h index fcdde1f8eec6c28b212ab0be5b98bd1988b11e83..3a0af00a4f1ef36eb753fe29db39151b45f8519e 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -91,26 +91,6 @@ struct remote { socklen_t addr_len; }; -/** - * This is used in local hot standby or replication - * relay mode: look for changes in the wal_dir and apply them - * locally or send to the replica. - */ -struct wal_watcher { - /** - * Rescan the WAL directory in search for new WAL files - * every wal_dir_rescan_delay seconds. - */ - ev_timer dir_timer; - /** - * When the latest WAL does not contain a EOF marker, - * re-read its tail on every change in file metadata. - */ - ev_stat stat; - /** Path to the file being watched with 'stat'. */ - char filename[PATH_MAX+1]; -}; - struct recovery_state { struct vclock vclock; /** The WAL we're currently reading/writing from/to. */ @@ -120,7 +100,12 @@ struct recovery_state { /** Used to find missing xlog files */ int64_t signature; struct wal_writer *writer; - struct wal_watcher watcher; + /** + * This is used in local hot standby or replication + * relay mode: look for changes in the wal_dir and apply them + * locally or send to the replica. + */ + struct fiber *watcher; union { /** slave->master state */ struct remote remote; diff --git a/src/box/replica.cc b/src/box/replica.cc index c04f4f5b5eb2dd0c62e3e5f16a1da4cff3e3bd59..11d3b825e6ed3355fde724fc39212c0fd56b6e24 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -263,7 +263,7 @@ pull_from_remote(va_list ap) remote_set_status(&r->remote, "off"); throw; } catch (Exception *e) { - remote_set_status(&r->remote, "failed"); + remote_set_status(&r->remote, "disconnected"); if (! r->remote.warning_said) { if (err != NULL) say_info("%s", err); diff --git a/src/box/replication.cc b/src/box/replication.cc index a451c50484c34962625082a29c19748511a1ab48..66ec9bfa6b43d38474b4c8e13e7aee439063cecb 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -685,9 +685,6 @@ replication_relay_subscribe(struct recovery_state *r) r->server_id = r->relay.server_id; recovery_follow_local(r, 0.1); - ev_run(loop(), 0); - - say_crit("exiting the relay loop"); } /** The main loop of replication client service process. */ @@ -748,7 +745,6 @@ replication_relay_loop(struct relay *relay) sio_setfl(relay->sock, O_NONBLOCK, 0); /* Initialize the recovery process */ - int rc = EXIT_SUCCESS; struct recovery_state *r = NULL; try { r = recovery_new(cfg_snap_dir, cfg_wal_dir, @@ -759,9 +755,10 @@ replication_relay_loop(struct relay *relay) replication_relay_subscribe(r); } catch (Exception *e) { say_error("relay error: %s", e->errmsg()); - rc = EXIT_FAILURE; + if (r) + recovery_delete(r); + exit(EXIT_FAILURE); } - if (r) - recovery_delete(r); - exit(rc); + /** Return control back to the sched. */ + fiber_yield(); }