From ea65fb11cee0dac848f07736f9eacca410cc035e Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Sat, 4 Apr 2015 00:54:35 +0300 Subject: [PATCH] sophia: replication, remove vclock_join Use a correct server id to relay snapshot records to a replica (server id 0 is server id of the snapshot). Move recover_snap() implementation to memtx_engine. --- src/box/memtx_engine.cc | 47 +++++++++++++++++++++++++++++++++++++- src/box/recovery.cc | 49 +--------------------------------------- src/box/recovery.h | 4 ++-- src/box/replication.cc | 2 +- src/box/sophia_engine.cc | 2 +- 5 files changed, 51 insertions(+), 53 deletions(-) diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 1a0a3e9b15..362a358107 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -49,6 +49,7 @@ #include "coeio_file.h" #include "coio.h" #include "errinj.h" +#include "scoped_guard.h" /** For all memory used by all indexes. */ extern struct quota memtx_quota; @@ -211,13 +212,57 @@ MemtxEngine::MemtxEngine() ENGINE_AUTO_CHECK_UPDATE; } +/** + * Read a snapshot and call apply_row for every snapshot row. + * Panic in case of error. + * + * @pre there is an existing snapshot. Otherwise + * recovery_bootstrap() should be used instead. + */ +void +recover_snap(struct recovery_state *r) +{ + /* There's no current_wal during initial recover. */ + assert(r->current_wal == NULL); + say_info("recovery start"); + /** + * Don't rescan the directory, it's done when + * recovery is initialized. + */ + struct vclock *res = vclockset_last(&r->snap_dir.index); + /* + * The only case when the directory index is empty is + * when someone has deleted a snapshot and tries to join + * as a replica. Our best effort is to not crash in such case. + */ + if (res == NULL) + tnt_raise(ClientError, ER_MISSING_SNAPSHOT); + int64_t signature = vclock_signature(res); + + struct xlog *snap = xlog_open(&r->snap_dir, signature, NONE); + auto guard = make_scoped_guard([=]{ + xlog_close(snap); + }); + /* Save server UUID */ + r->server_uuid = snap->server_uuid; + + /* Add a surrogate server id for snapshot rows */ + vclock_add_server(&r->vclock, 0); + + say_info("recovering from `%s'", snap->filename); + recover_xlog(r, snap); +} + /** Called at start to tell memtx to recover to a given LSN. */ void MemtxEngine::recoverToCheckpoint(int64_t /* lsn */) { + struct recovery_state *r = ::recovery; m_state = MEMTX_READING_SNAPSHOT; /* Process existing snapshot */ - recover_snap(::recovery); + recover_snap(r); + /* Replace server vclock using the data from snapshot */ + vclock_copy(&r->vclock, vclockset_last(&r->snap_dir.index)); m_state = MEMTX_READING_WAL; space_foreach(memtx_end_build_primary_key, this); } diff --git a/src/box/recovery.cc b/src/box/recovery.cc index a4b8d662ac..c2c1b1473d 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -181,7 +181,6 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, xdir_create(&r->wal_dir, wal_dirname, XLOG, &r->server_uuid); vclock_create(&r->vclock); - vclock_create(&r->vclock_join); xdir_scan(&r->snap_dir); /** @@ -274,7 +273,7 @@ recovery_apply_row(struct recovery_state *r, struct xrow_header *row) * @retval 0 OK, read full xlog. * @retval 1 OK, read some but not all rows, or no EOF marker */ -static int +int recover_xlog(struct recovery_state *r, struct xlog *l) { struct xlog_cursor i; @@ -337,52 +336,6 @@ recovery_bootstrap(struct recovery_state *r) recover_xlog(r, snap); } -/** - * Read a snapshot and call apply_row for every snapshot row. - * Panic in case of error. - * - * @pre there is an existing snapshot. Otherwise - * recovery_bootstrap() should be used instead. - */ -void -recover_snap(struct recovery_state *r) -{ - /* There's no current_wal during initial recover. */ - assert(r->current_wal == NULL); - say_info("recovery start"); - /** - * Don't rescan the directory, it's done when - * recovery is initialized. - */ - struct vclock *res = vclockset_last(&r->snap_dir.index); - /* - * The only case when the directory index is empty is - * when someone has deleted a snapshot and tries to join - * as a replica. Our best effort is to not crash in such case. - */ - if (res == NULL) - tnt_raise(ClientError, ER_MISSING_SNAPSHOT); - int64_t signature = vclock_signature(res); - - struct xlog *snap = xlog_open(&r->snap_dir, signature, NONE); - auto guard = make_scoped_guard([=]{ - xlog_close(snap); - }); - /* Save server UUID */ - r->server_uuid = snap->server_uuid; - - /* Add a surrogate server id for snapshot rows */ - vclock_add_server(&r->vclock, 0); - - say_info("recovering from `%s'", snap->filename); - recover_xlog(r, snap); - - /* Replace server vclock using the data from snapshot */ - vclock_copy(&r->vclock_join, &r->vclock); - vclock_copy(&r->vclock, &snap->vclock); -} - - /** Find out if there are new .xlog files since the current * LSN, and read them all up. * diff --git a/src/box/recovery.h b/src/box/recovery.h index ad4eabf307..540b4b1596 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -79,7 +79,6 @@ struct remote { }; struct recovery_state { - struct vclock vclock_join; struct vclock vclock; /** The WAL we're currently reading/writing from/to. */ struct xlog *current_wal; @@ -134,7 +133,8 @@ recovery_has_data(struct recovery_state *r) vclockset_first(&r->wal_dir.index) != NULL; } void recovery_bootstrap(struct recovery_state *r); -void recover_snap(struct recovery_state *r); +int +recover_xlog(struct recovery_state *r, struct xlog *l); void recovery_follow_local(struct recovery_state *r, const char *name, ev_tstamp wal_dir_rescan_delay); diff --git a/src/box/replication.cc b/src/box/replication.cc index aec788cb7d..f6742554da 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -88,7 +88,7 @@ replication_join_f(va_list ap) /* Send response to JOIN command = end of stream */ struct xrow_header row; - xrow_encode_vclock(&row, &r->vclock); + xrow_encode_vclock(&row, vclockset_last(&r->snap_dir.index)); row.sync = relay->sync; struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec(&row, iov); diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc index 09547375d7..6330a411f9 100644 --- a/src/box/sophia_engine.cc +++ b/src/box/sophia_engine.cc @@ -145,8 +145,8 @@ sophia_send_row(Relay *relay, uint32_t space_id, char *tuple, body.k_tuple = IPROTO_TUPLE; struct xrow_header row; row.type = IPROTO_INSERT; - row.lsn = vclock_inc(&r->vclock_join, r->server_id); row.server_id = 0; + row.lsn = vclock_inc(&r->vclock, row.server_id); row.bodycnt = 2; row.body[0].iov_base = &body; row.body[0].iov_len = sizeof(body); -- GitLab