diff --git a/src/box/box.cc b/src/box/box.cc index 162fdbfe122348f50ab5c539d4cfbb8b5faf3446..7dfe29c56ed392456bda9140e94f3f8ee9f55958 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -227,7 +227,12 @@ box_leave_local_standby_mode(void *data __attribute__((unused))) */ return; } - recovery_finalize(recovery, cfg_geti("rows_per_wal")); + try { + recovery_finalize(recovery, cfg_geti("rows_per_wal")); + } catch (Exception *e) { + e->log(); + panic("unable to successfully finalize recovery"); + } /* * notify engines about end of recovery. diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 16666a20fd31c184c1f5843ccc21ddab523f0980..c4c0cdd5876104c4659126dcb1043a83fbb1084f 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -148,8 +148,12 @@ static int wal_writer_start(struct recovery_state *state, int rows_per_wal); void wal_writer_stop(struct recovery_state *r); + +static void +wal_watcher_stop(struct recovery_state *r); + static void -recovery_stop_local(struct recovery_state *r); +wal_watcher_init(struct wal_watcher *watcher); /** * Throws an exception in case of error. @@ -195,6 +199,8 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, */ xdir_check(&r->wal_dir); + wal_watcher_init(&r->watcher); + guard.is_active = false; return r; } @@ -232,8 +238,7 @@ recovery_close_log(struct recovery_state *r) void recovery_delete(struct recovery_state *r) { - if (r->watcher) - recovery_stop_local(r); + wal_watcher_stop(r); if (r->writer) wal_writer_stop(r); @@ -288,6 +293,14 @@ recover_xlog(struct recovery_state *r, struct xlog *l) e->log(); } } + /** + * We should never try to read snapshots with no EOF + * marker - such snapshots are very likely unfinished + * or corrupted, and should not be trusted. + */ + if (l->dir->type == SNAP && l->is_inprogress == false && i.eof_read == false) + panic("snapshot `%s' has no EOF marker", l->filename); + /* * xlog_cursor_next() returns 1 when * it can not read more rows. This doesn't mean @@ -316,8 +329,7 @@ recovery_bootstrap(struct recovery_state *r) xlog_close(snap); }); /** The snapshot must have a EOF marker. */ - if (recover_xlog(r, snap) != LOG_EOF) - panic("can't process bootstrap snapshot"); + recover_xlog(r, snap); } /** @@ -354,8 +366,7 @@ recover_snap(struct recovery_state *r) vclock_add_server(&r->vclock, 0); say_info("recovering from `%s'", snap->filename); - if (recover_xlog(r, snap) != LOG_EOF) - panic("can't process snapshot"); + recover_xlog(r, snap); /* Replace server vclock using the data from snapshot */ vclock_copy(&r->vclock, &snap->vclock); } @@ -377,12 +388,7 @@ recover_remaining_wals(struct recovery_state *r) size_t rows_before; enum log_suffix suffix; - try { - xdir_scan(&r->wal_dir); - } catch (Exception *e) { - e->log(); - return -1; - } + xdir_scan(&r->wal_dir); current_vclock = vclockset_last(&r->wal_dir.index); last_signature = current_vclock != NULL ? @@ -514,8 +520,7 @@ recovery_finalize(struct recovery_state *r, int rows_per_wal) { int result; - if (r->watcher) - recovery_stop_local(r); + wal_watcher_stop(r); r->finalize = true; @@ -530,20 +535,23 @@ recovery_finalize(struct recovery_state *r, int rows_per_wal) if (!r->current_wal->is_inprogress) { if (r->current_wal->rows == 0) /* Regular WAL (not inprogress) must contain at least one row */ - panic("zero rows was successfully read from last WAL `%s'", + panic("zero rows was successfully read from last WAL '%s'", r->current_wal->filename); } else if (r->current_wal->rows == 0) { /* Unlink empty inprogress WAL */ - say_warn("unlink broken %s WAL", r->current_wal->filename); + say_warn("deleting broken WAL '%s'", r->current_wal->filename); if (inprogress_log_unlink(r->current_wal->filename) != 0) panic("can't unlink 'inprogress' WAL"); } else if (r->current_wal->rows <= 1 /* one row */) { /* Rename inprogress wal with one row */ - say_warn("rename unfinished %s WAL", r->current_wal->filename); + say_warn("renaming unfinished WAL '%s'", + r->current_wal->filename); if (xlog_rename(r->current_wal) != 0) - panic("can't rename 'inprogress' WAL"); + panic("can't rename 'inprogress' WAL '%s'", + r->current_wal->filename); } else - panic("too many rows in inprogress WAL `%s'", r->current_wal->filename); + panic("too many rows in 'inprogress' WAL '%s'", + r->current_wal->filename); recovery_close_log(r); } @@ -556,28 +564,6 @@ recovery_finalize(struct recovery_state *r, int rows_per_wal) /* {{{ Local recovery: support of hot standby and replication relay */ -/** - * This is used in local hot standby or replication - * relay mode: look for changes in the wal_dir and apply them - * locally or send to the replica. - */ -struct wal_watcher { - /** - * Rescan the WAL directory in search for new WAL files - * every wal_dir_rescan_delay seconds. - */ - ev_timer dir_timer; - /** - * When the latest WAL does not contain a EOF marker, - * re-read its tail on every change in file metadata. - */ - ev_stat stat; - /** Path to the file being watched with 'stat'. */ - char filename[PATH_MAX+1]; -}; - -static struct wal_watcher wal_watcher; - static void recovery_rescan_file(ev_loop *, ev_stat *w, int /* revents */); static void @@ -600,7 +586,7 @@ static void recovery_rescan_dir(ev_loop * loop, ev_timer *w, int /* revents */) { struct recovery_state *r = (struct recovery_state *) w->data; - struct wal_watcher *watcher = r->watcher; + struct wal_watcher *watcher = &r->watcher; struct xlog *save_current_wal = r->current_wal; /** @@ -610,7 +596,13 @@ recovery_rescan_dir(ev_loop * loop, ev_timer *w, int /* revents */) * user. */ fiber_set_user(fiber(), &admin_credentials); - int result = recover_remaining_wals(r); + int result; + try { + result = recover_remaining_wals(r); + } catch (Exception *e) { + e->log(); + result = -1; + } fiber_set_user(fiber(), NULL); if (result < 0) panic("recover failed: %i", result); @@ -626,7 +618,7 @@ static void recovery_rescan_file(ev_loop * loop, ev_stat *w, int /* revents */) { struct recovery_state *r = (struct recovery_state *) w->data; - struct wal_watcher *watcher = r->watcher; + struct wal_watcher *watcher = &r->watcher; fiber_set_user(fiber(), &admin_credentials); try { if (recover_xlog(r, r->current_wal) == LOG_EOF) { @@ -643,14 +635,22 @@ recovery_rescan_file(ev_loop * loop, ev_stat *w, int /* revents */) fiber_set_user(fiber(), NULL); } +static void +wal_watcher_init(struct wal_watcher *watcher) +{ + watcher->filename[0] = '\0'; + ev_init(&watcher->dir_timer, recovery_rescan_dir); + ev_init(&watcher->stat, recovery_rescan_file); +} + + void recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay) { - assert(r->watcher == NULL); assert(r->writer == NULL); ev_loop *loop = loop(); - struct wal_watcher *watcher = r->watcher= &wal_watcher; + struct wal_watcher *watcher = &r->watcher; ev_timer_init(&watcher->dir_timer, recovery_rescan_dir, wal_dir_rescan_delay, wal_dir_rescan_delay); @@ -665,15 +665,13 @@ recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay) } static void -recovery_stop_local(struct recovery_state *r) +wal_watcher_stop(struct recovery_state *r) { - struct wal_watcher *watcher = r->watcher; - assert(ev_is_active(&watcher->dir_timer)); - ev_timer_stop(loop(), &watcher->dir_timer); + struct wal_watcher *watcher = &r->watcher; + if (ev_is_active(&watcher->dir_timer)) + ev_timer_stop(loop(), &watcher->dir_timer); if (ev_is_active(&watcher->stat)) ev_stat_stop(loop(), &watcher->stat); - - r->watcher = NULL; } /* }}} */ @@ -707,6 +705,7 @@ struct wal_writer bool is_rollback; ev_loop *txn_loop; struct vclock vclock; + bool is_started; }; static struct wal_writer wal_writer; @@ -827,6 +826,7 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock, /* Create and fill writer->cluster hash */ vclock_create(&writer->vclock); vclock_copy(&writer->vclock, vclock); + writer->is_started = false; } /** Destroy a WAL writer structure. */ @@ -857,13 +857,13 @@ static int wal_writer_start(struct recovery_state *r, int rows_per_wal) { assert(r->writer == NULL); - assert(r->watcher == NULL); assert(r->current_wal == NULL); assert(rows_per_wal > 1); assert(! wal_writer.is_shutdown); assert(STAILQ_EMPTY(&wal_writer.input)); assert(STAILQ_EMPTY(&wal_writer.commit)); + assert(wal_writer.is_started == false); /* I. Initialize the state. */ wal_writer_init(&wal_writer, &r->vclock, rows_per_wal); r->writer = &wal_writer; @@ -877,6 +877,7 @@ wal_writer_start(struct recovery_state *r, int rows_per_wal) r->writer = NULL; return -1; } + wal_writer.is_started = true; return 0; } @@ -898,6 +899,7 @@ wal_writer_stop(struct recovery_state *r) } ev_async_stop(writer->txn_loop, &writer->write_event); + wal_writer.is_started = false; wal_writer_destroy(writer); r->writer = NULL; @@ -1218,7 +1220,7 @@ snapshot_save(struct recovery_state *r, snapshot_f snapshot_handler) { struct xlog *snap = xlog_create(&r->snap_dir, &r->vclock); if (snap == NULL) - panic_status(errno, "Failed to save snapshot: failed to open file in write mode."); + panic_status(errno, "failed to save snapshot: failed to open file in write mode."); /* * While saving a snapshot, snapshot name is set to * <lsn>.snap.inprogress. When done, the snapshot is diff --git a/src/box/recovery.h b/src/box/recovery.h index 76fac6b3e882429db9be82d950edb0d915fe755b..9b3187195fab3e11d83aa824e199012f5f7a0b3c 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -90,6 +90,26 @@ struct remote { socklen_t addr_len; }; +/** + * This is used in local hot standby or replication + * relay mode: look for changes in the wal_dir and apply them + * locally or send to the replica. + */ +struct wal_watcher { + /** + * Rescan the WAL directory in search for new WAL files + * every wal_dir_rescan_delay seconds. + */ + ev_timer dir_timer; + /** + * When the latest WAL does not contain a EOF marker, + * re-read its tail on every change in file metadata. + */ + ev_stat stat; + /** Path to the file being watched with 'stat'. */ + char filename[PATH_MAX+1]; +}; + struct recovery_state { struct vclock vclock; /** The WAL we're currently reading/writing from/to. */ @@ -99,7 +119,7 @@ struct recovery_state { /** Used to find missing xlog files */ int64_t signature; struct wal_writer *writer; - struct wal_watcher *watcher; + struct wal_watcher watcher; union { /** slave->master state */ struct remote remote; diff --git a/src/box/xlog.cc b/src/box/xlog.cc index 91cdf546e557d53c019e05eb40326a265788e4e0..b8e4960426c0c5cae4ee8b8011fb2d2efd1ce8b2 100644 --- a/src/box/xlog.cc +++ b/src/box/xlog.cc @@ -87,6 +87,7 @@ xdir_create(struct xdir *dir, const char *dirname, dir->filetype = "XLOG\n"; dir->filename_ext = ".xlog"; } + dir->type = type; } /** diff --git a/src/box/xlog.h b/src/box/xlog.h index df549ed8acd77833d611d8be07026c7c6d9bbc07..88629c0962e210904d618dfd57b467d1e05e294d 100644 --- a/src/box/xlog.h +++ b/src/box/xlog.h @@ -111,6 +111,7 @@ struct xdir { * Directory path. */ char dirname[PATH_MAX+1]; + enum xdir_type type; }; /**