diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 1a0a3e9b15716969071a4f80b83cdc22d32552d9..362a358107c7cdfbfc99873900ac6b77f8244071 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -49,6 +49,7 @@ #include "coeio_file.h" #include "coio.h" #include "errinj.h" +#include "scoped_guard.h" /** For all memory used by all indexes. */ extern struct quota memtx_quota; @@ -211,13 +212,57 @@ MemtxEngine::MemtxEngine() ENGINE_AUTO_CHECK_UPDATE; } +/** + * Read a snapshot and call apply_row for every snapshot row. + * Panic in case of error. + * + * @pre there is an existing snapshot. Otherwise + * recovery_bootstrap() should be used instead. + */ +void +recover_snap(struct recovery_state *r) +{ + /* There's no current_wal during initial recover. */ + assert(r->current_wal == NULL); + say_info("recovery start"); + /** + * Don't rescan the directory, it's done when + * recovery is initialized. + */ + struct vclock *res = vclockset_last(&r->snap_dir.index); + /* + * The only case when the directory index is empty is + * when someone has deleted a snapshot and tries to join + * as a replica. Our best effort is to not crash in such case. + */ + if (res == NULL) + tnt_raise(ClientError, ER_MISSING_SNAPSHOT); + int64_t signature = vclock_signature(res); + + struct xlog *snap = xlog_open(&r->snap_dir, signature, NONE); + auto guard = make_scoped_guard([=]{ + xlog_close(snap); + }); + /* Save server UUID */ + r->server_uuid = snap->server_uuid; + + /* Add a surrogate server id for snapshot rows */ + vclock_add_server(&r->vclock, 0); + + say_info("recovering from `%s'", snap->filename); + recover_xlog(r, snap); +} + /** Called at start to tell memtx to recover to a given LSN. */ void MemtxEngine::recoverToCheckpoint(int64_t /* lsn */) { + struct recovery_state *r = ::recovery; m_state = MEMTX_READING_SNAPSHOT; /* Process existing snapshot */ - recover_snap(::recovery); + recover_snap(r); + /* Replace server vclock using the data from snapshot */ + vclock_copy(&r->vclock, vclockset_last(&r->snap_dir.index)); m_state = MEMTX_READING_WAL; space_foreach(memtx_end_build_primary_key, this); } diff --git a/src/box/recovery.cc b/src/box/recovery.cc index a4b8d662ac29bbf2f192a37dc238ceb93666b695..c2c1b1473d069d706a11855dc7a1ac44efc4b096 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -181,7 +181,6 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, xdir_create(&r->wal_dir, wal_dirname, XLOG, &r->server_uuid); vclock_create(&r->vclock); - vclock_create(&r->vclock_join); xdir_scan(&r->snap_dir); /** @@ -274,7 +273,7 @@ recovery_apply_row(struct recovery_state *r, struct xrow_header *row) * @retval 0 OK, read full xlog. * @retval 1 OK, read some but not all rows, or no EOF marker */ -static int +int recover_xlog(struct recovery_state *r, struct xlog *l) { struct xlog_cursor i; @@ -337,52 +336,6 @@ recovery_bootstrap(struct recovery_state *r) recover_xlog(r, snap); } -/** - * Read a snapshot and call apply_row for every snapshot row. - * Panic in case of error. - * - * @pre there is an existing snapshot. Otherwise - * recovery_bootstrap() should be used instead. - */ -void -recover_snap(struct recovery_state *r) -{ - /* There's no current_wal during initial recover. */ - assert(r->current_wal == NULL); - say_info("recovery start"); - /** - * Don't rescan the directory, it's done when - * recovery is initialized. - */ - struct vclock *res = vclockset_last(&r->snap_dir.index); - /* - * The only case when the directory index is empty is - * when someone has deleted a snapshot and tries to join - * as a replica. Our best effort is to not crash in such case. - */ - if (res == NULL) - tnt_raise(ClientError, ER_MISSING_SNAPSHOT); - int64_t signature = vclock_signature(res); - - struct xlog *snap = xlog_open(&r->snap_dir, signature, NONE); - auto guard = make_scoped_guard([=]{ - xlog_close(snap); - }); - /* Save server UUID */ - r->server_uuid = snap->server_uuid; - - /* Add a surrogate server id for snapshot rows */ - vclock_add_server(&r->vclock, 0); - - say_info("recovering from `%s'", snap->filename); - recover_xlog(r, snap); - - /* Replace server vclock using the data from snapshot */ - vclock_copy(&r->vclock_join, &r->vclock); - vclock_copy(&r->vclock, &snap->vclock); -} - - /** Find out if there are new .xlog files since the current * LSN, and read them all up. * diff --git a/src/box/recovery.h b/src/box/recovery.h index ad4eabf3070cfef8a82ca0fe7d602462715cc34b..540b4b159694e4b9c2d59466247b1c0d64323414 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -79,7 +79,6 @@ struct remote { }; struct recovery_state { - struct vclock vclock_join; struct vclock vclock; /** The WAL we're currently reading/writing from/to. */ struct xlog *current_wal; @@ -134,7 +133,8 @@ recovery_has_data(struct recovery_state *r) vclockset_first(&r->wal_dir.index) != NULL; } void recovery_bootstrap(struct recovery_state *r); -void recover_snap(struct recovery_state *r); +int +recover_xlog(struct recovery_state *r, struct xlog *l); void recovery_follow_local(struct recovery_state *r, const char *name, ev_tstamp wal_dir_rescan_delay); diff --git a/src/box/replication.cc b/src/box/replication.cc index aec788cb7d166af96b32014cca054ea11fa4d885..f6742554da8a6b60d6516f62fe1cac64d879cd74 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -88,7 +88,7 @@ replication_join_f(va_list ap) /* Send response to JOIN command = end of stream */ struct xrow_header row; - xrow_encode_vclock(&row, &r->vclock); + xrow_encode_vclock(&row, vclockset_last(&r->snap_dir.index)); row.sync = relay->sync; struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec(&row, iov); diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc index 09547375d789582f462be03ff0fd27a7760c9f2a..6330a411f9f78205b7fbf995633a52adcbce2154 100644 --- a/src/box/sophia_engine.cc +++ b/src/box/sophia_engine.cc @@ -145,8 +145,8 @@ sophia_send_row(Relay *relay, uint32_t space_id, char *tuple, body.k_tuple = IPROTO_TUPLE; struct xrow_header row; row.type = IPROTO_INSERT; - row.lsn = vclock_inc(&r->vclock_join, r->server_id); row.server_id = 0; + row.lsn = vclock_inc(&r->vclock, row.server_id); row.bodycnt = 2; row.body[0].iov_base = &body; row.body[0].iov_len = sizeof(body);