diff --git a/include/recovery.h b/include/recovery.h index a9741d27a78a907c8440f331c64a39a43d5abae1..0e46a5da0e6b1b325d97e19a6a730acc3af97293 100644 --- a/include/recovery.h +++ b/include/recovery.h @@ -107,7 +107,8 @@ void recovery_init(const char *snap_dirname, const char *xlog_dirname, void recovery_update_mode(const char *wal_mode, double fsync_delay); void recovery_update_io_rate_limit(double new_limit); void recovery_free(); -void recover(struct recovery_state *, i64 lsn); +void recover_snap(struct recovery_state *); +void recover_existing_wals(struct recovery_state *); void recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay); void recovery_finalize(struct recovery_state *r); int wal_write(struct recovery_state *r, i64 lsn, u64 cookie, diff --git a/mod/box/box.m b/mod/box/box.m index fc0763735e1f2ddfdfed8b913f0a06ab1bb35ce9..bf11ff7de98b23176c03a37409dec23bb03eddfb 100644 --- a/mod/box/box.m +++ b/mod/box/box.m @@ -64,11 +64,6 @@ struct box_snap_row { u8 data[]; } __attribute__((packed)); -static enum { - RECOVERY_PHASE_1, - RECOVERY_PHASE_2, -} recovery_phase; - static inline struct box_snap_row * box_snap_row(const struct tbuf *t) { @@ -250,42 +245,21 @@ xlog_print(struct tbuf *t) return 0; } -static void -recovery_phase_1(void) -{ - recovery_phase = RECOVERY_PHASE_1; - say_info("begin snapshot recovery"); - begin_build_primary_indexes(); -} - -static void -recovery_phase_2(void) -{ - assert(recovery_phase == RECOVERY_PHASE_1); - recovery_phase = RECOVERY_PHASE_2; - say_info("end snapshot recovery and building primary indexes"); - end_build_primary_indexes(); -} - static void recover_snap_row(struct tbuf *t) { + assert(primary_indexes_enabled == false); + struct box_snap_row *row = box_snap_row(t); struct tuple *tuple = tuple_alloc(row->data_size); memcpy(tuple->data, row->data, row->data_size); tuple->field_count = row->tuple_size; - tuple_ref(tuple, 1); - @try { - struct space *space = space_find(row->space); - Index *index = space->index[0]; - [index buildNext: tuple]; - } - @catch(...) { - tuple_ref(tuple, -1); - @throw; - } + struct space *space = space_find(row->space); + Index *index = space->index[0]; + [index buildNext: tuple]; + tuple_ref(tuple, 1); } static int @@ -299,12 +273,8 @@ recover_row(struct tbuf *t) u16 tag = read_u16(t); read_u64(t); /* drop cookie */ if (tag == SNAP) { - assert(recovery_phase == RECOVERY_PHASE_1); recover_snap_row(t); } else if (tag == XLOG) { - if (recovery_phase == RECOVERY_PHASE_1) { - recovery_phase_2(); - } u16 op = read_u16(t); struct txn *txn = txn_begin(); txn->txn_flags |= BOX_NOT_STORE; @@ -507,11 +477,10 @@ mod_init(void) if (init_storage) return; - recovery_phase_1(); - recover(recovery_state, 0); - if (recovery_phase == RECOVERY_PHASE_1) { - recovery_phase_2(); - } + begin_build_primary_indexes(); + recover_snap(recovery_state); + end_build_primary_indexes(); + recover_existing_wals(recovery_state); stat_cleanup(stat_base, requests_MAX); @@ -571,7 +540,8 @@ snapshot_write_tuple(struct log_io *l, struct nbatch *batch, void mod_snapshot(struct log_io *l, struct nbatch *batch) { - if (recovery_phase == RECOVERY_PHASE_1) + /* --init-storage switch */ + if (primary_indexes_enabled == false) return; for (uint32_t n = 0; n < BOX_SPACE_MAX; ++n) { diff --git a/mod/box/index.h b/mod/box/index.h index f58579d5ceb08e01a783f5e7aa0be88cb32f9f47..2255db9a3e5f8466b9c11aff0aa18baf9a1db50c 100644 --- a/mod/box/index.h +++ b/mod/box/index.h @@ -108,11 +108,12 @@ struct key_def { /** Destroy and free index instance. */ - (void) free; /** - * Finish index construction. + * Two-phase index creation: begin building, add tuples, finish. */ -- (void) buildBegin; +- (void) beginBuild; - (void) buildNext: (struct tuple *)tuple; -- (void) buildEnd; +- (void) endBuild; +/** Build this index based on the contents of another index. */ - (void) build: (Index *) pk; - (size_t) size; - (struct tuple *) min; diff --git a/mod/box/index.m b/mod/box/index.m index 38f252c6dae079a419529772fe13f2c367dfff85..56f506d0cc021c647ad7acaad64501f630b3023c 100644 --- a/mod/box/index.m +++ b/mod/box/index.m @@ -122,7 +122,7 @@ iterator_first_equal(struct iterator *it) [super free]; } -- (void) buildBegin +- (void) beginBuild { [self subclassResponsibility: _cmd]; } @@ -133,7 +133,7 @@ iterator_first_equal(struct iterator *it) [self subclassResponsibility: _cmd]; } -- (void) buildEnd +- (void) endBuild { [self subclassResponsibility: _cmd]; } @@ -286,7 +286,7 @@ hash_iterator_free(struct iterator *iterator) [self subclassResponsibility: _cmd]; } -- (void) buildBegin +- (void) beginBuild { } @@ -295,7 +295,7 @@ hash_iterator_free(struct iterator *iterator) [self replace: NULL :tuple]; } -- (void) buildEnd +- (void) endBuild { } diff --git a/mod/box/space.h b/mod/box/space.h index 08555a75b316b2919e1169d764dcde32d8822650..4ca67a5c0b346da89dafcdeb956c85ad478678c3 100644 --- a/mod/box/space.h +++ b/mod/box/space.h @@ -124,6 +124,10 @@ index_is_primary(Index *index) * already built and ready for use. */ extern bool secondary_indexes_enabled; +/** + * Primary indexes are enabled only after reading the snapshot. + */ +extern bool primary_indexes_enabled; static inline int index_count(struct space *sp) diff --git a/mod/box/space.m b/mod/box/space.m index f02e3688890a531df2bfeb2ae62ae4b43c3ae750..276686071d44dbe4c654c7ecc199c65e4499ffd9 100644 --- a/mod/box/space.m +++ b/mod/box/space.m @@ -40,6 +40,7 @@ struct space *spaces = NULL; bool secondary_indexes_enabled = false; +bool primary_indexes_enabled = false; /** Free a key definition. */ static void key_free(struct key_def *key_def) @@ -304,12 +305,13 @@ space_init(void) void begin_build_primary_indexes(void) { + assert(primary_indexes_enabled == false); for (u32 n = 0; n < BOX_SPACE_MAX; ++n) { if (spaces[n].enabled == false) continue; Index *pk = spaces[n].index[0]; - [pk buildBegin]; + [pk beginBuild]; } } @@ -321,13 +323,15 @@ end_build_primary_indexes(void) continue; Index *pk = spaces[n].index[0]; - [pk buildEnd]; + [pk endBuild]; } + primary_indexes_enabled = true; } void build_secondary_indexes(void) { + assert(primary_indexes_enabled == true); assert(secondary_indexes_enabled == false); for (u32 n = 0; n < BOX_SPACE_MAX; ++n) { diff --git a/mod/box/tree.m b/mod/box/tree.m index c313bcfd7a3daf54c949878f3f11c1a324c053a8..1c25acd800a6a6a995343d3b5c624153f903eb20 100644 --- a/mod/box/tree.m +++ b/mod/box/tree.m @@ -972,7 +972,7 @@ tree_iterator_free(struct iterator *iterator) } } -- (void) buildBegin +- (void) beginBuild { assert(index_is_primary(self)); @@ -1006,7 +1006,7 @@ tree_iterator_free(struct iterator *iterator) tree.size++; } -- (void) buildEnd +- (void) endBuild { assert(index_is_primary(self)); diff --git a/src/recovery.m b/src/recovery.m index ed0290018194f0ade5ff81d6d3aba3975a1cca3e..a29b39709c24c397cb3ad43dc04ba0e370a94f3f 100644 --- a/src/recovery.m +++ b/src/recovery.m @@ -258,26 +258,27 @@ recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_e /** * Read a snapshot and call row_handler for every snapshot row. - * - * @retval 0 success - * @retval -1 failure + * Panic in case of error. */ -static int +void recover_snap(struct recovery_state *r) { - int res = -1; + /* current_wal isn't open during initial recover. */ + assert(r->current_wal == NULL); + say_info("recovery start"); + struct log_io *snap; i64 lsn; lsn = greatest_lsn(r->snap_dir); if (lsn <= 0) { say_error("can't find snapshot"); - return -1; + goto error; } snap = log_io_open_for_read(r->snap_dir, lsn, NONE); if (snap == NULL) { say_error("can't find/open snapshot"); - return -1; + goto error; } say_info("recover from `%s'", snap->filename); struct log_io_cursor i; @@ -288,15 +289,24 @@ recover_snap(struct recovery_state *r) while ((row = log_io_cursor_next(&i))) { if (r->row_handler(row) < 0) { say_error("can't apply row"); - goto end; + break; } } - r->lsn = r->confirmed_lsn = lsn; - res = 0; -end: log_io_cursor_close(&i); log_io_close(&snap); - return res; + + if (row == NULL) { + r->lsn = r->confirmed_lsn = lsn; + say_info("snapshot recovered, confirmed lsn: %" + PRIi64, r->confirmed_lsn); + return; + } +error: + if (greatest_lsn(r->snap_dir) <= 0) { + say_crit("didn't you forget to initialize storage with --init-storage switch?"); + _exit(1); + } + panic("snapshot recovery failed"); } #define LOG_EOF 0 @@ -465,48 +475,16 @@ recover_current_wal: return result; } +/** + * Recover all WALs created after the last snapshot. Panic if + * error. + */ void -recover(struct recovery_state *r, i64 lsn) +recover_existing_wals(struct recovery_state *r) { - /* * current_wal isn't open during initial recover. */ - assert(r->current_wal == NULL); - /* - * If the caller sets confirmed_lsn to a non-zero value, - * snapshot recovery is skipped and we proceed directly to - * finding the WAL with the respective LSN and continue - * recovery from this WAL. @fixme: this is a gotcha, due - * to whihc a replica is unable to read data from a master - * if the replica has no snapshot or the master has no WAL - * with the requested LSN. - */ - say_info("recovery start"); - if (lsn == 0) { - if (recover_snap(r) != 0) { - if (greatest_lsn(r->snap_dir) <= 0) { - say_crit("didn't you forget to initialize storage with --init-storage switch?"); - _exit(1); - } - panic("snapshot recovery failed"); - } - say_info("snapshot recovered, confirmed lsn: %" - PRIi64, r->confirmed_lsn); - } else { - /* - * Note that recovery starts with lsn _NEXT_ to - * the confirmed one. - */ - r->lsn = r->confirmed_lsn = lsn - 1; - } i64 next_lsn = r->confirmed_lsn + 1; i64 wal_lsn = find_including_file(r->wal_dir, next_lsn); if (wal_lsn <= 0) { - if (lsn != 0) { - /* - * Recovery for replication relay, did not - * find the requested LSN. - */ - say_error("can't find WAL containing record with lsn: %" PRIi64, next_lsn); - } /* No WALs to recover from. */ goto out; } diff --git a/src/replication.m b/src/replication.m index 167ed11e0f8d9cbe5ea63056bbc4b781f68d42a5..88ce28bbd3cb972ccbad067146aea11aebe6ebf4 100644 --- a/src/replication.m +++ b/src/replication.m @@ -637,8 +637,15 @@ replication_relay_loop(int client_sock) recovery_init(cfg.snap_dir, cfg.wal_dir, replication_relay_send_row, INT32_MAX, "fsync_delay", 0, RECOVER_READONLY); - - recover(recovery_state, lsn); + /* + * Note that recovery starts with lsn _NEXT_ to + * the confirmed one. + */ + recovery_state->lsn = recovery_state->confirmed_lsn = lsn - 1; + recover_existing_wals(recovery_state); + /* Found nothing. */ + if (recovery_state->lsn == lsn - 1) + say_error("can't find WAL containing record with lsn: %" PRIi64, lsn); recovery_follow_local(recovery_state, 0.1); ev_loop(0);