diff --git a/src/box/box.cc b/src/box/box.cc index ca114c8dd14302a8af731221497e6c7b3d75bbe1..162fdbfe122348f50ab5c539d4cfbb8b5faf3446 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -227,7 +227,7 @@ box_leave_local_standby_mode(void *data __attribute__((unused))) */ return; } - recovery_finalize(recovery); + recovery_finalize(recovery, cfg_geti("rows_per_wal")); /* * notify engines about end of recovery. @@ -417,9 +417,7 @@ box_init() /* recovery initialization */ recovery = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), - recover_row, NULL, - box_snapshot_cb, - cfg_geti("rows_per_wal")); + recover_row, NULL); recovery_set_remote(recovery, cfg_gets("replication_source")); recovery_update_io_rate_limit(recovery, @@ -436,14 +434,14 @@ box_init() /* Initialize a new replica */ replica_bootstrap(recovery); space_end_recover_snapshot(); - snapshot_save(recovery); + snapshot_save(recovery, box_snapshot_cb); } else { /* Initialize the first server of a new cluster */ recovery_bootstrap(recovery); box_set_cluster_uuid(); box_set_server_uuid(); space_end_recover_snapshot(); - snapshot_save(recovery); + snapshot_save(recovery, box_snapshot_cb); } fiber_gc(); } catch (Exception *e) { @@ -564,7 +562,7 @@ box_snapshot(void) * parent stdio buffers at exit(). */ close_all_xcpt(1, log_fd); - snapshot_save(recovery); + snapshot_save(recovery, box_snapshot_cb); exit(EXIT_SUCCESS); return 0; diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 68b19d72b77261916e97520803f5e3448e6a06f9..729276c412d314cbca2990226c5fbbc6d61757c8 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -145,21 +145,27 @@ fill_lsn(struct recovery_state *r, struct xrow_header *row) /* {{{ Initial recovery */ static int -wal_writer_start(struct recovery_state *state); +wal_writer_start(struct recovery_state *state, int rows_per_wal); void wal_writer_stop(struct recovery_state *r); static void recovery_stop_local(struct recovery_state *r); +/** + * Throws an exception in case of error. + */ struct recovery_state * recovery_new(const char *snap_dirname, const char *wal_dirname, - row_handler row_handler, void *row_handler_param, - snapshot_handler snapshot_handler, - int rows_per_wal) + apply_row_f apply_row, void *apply_row_param) { struct recovery_state *r = (struct recovery_state *) calloc(1, sizeof(*r)); + if (r == NULL) { + tnt_raise(OutOfMemory, sizeof(*r), "malloc", + "struct recovery"); + } + auto guard = make_scoped_guard([=]{ free(r); }); @@ -168,12 +174,10 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, assert(rows_per_wal > 1); - r->row_handler = row_handler; - r->row_handler_param = row_handler_param; + r->apply_row = apply_row; + r->apply_row_param = apply_row_param; r->signature = -1; - r->snapshot_handler = snapshot_handler; - xdir_create(&r->snap_dir, snap_dirname, SNAP, &r->server_uuid); xdir_create(&r->wal_dir, wal_dirname, XLOG, &r->server_uuid); @@ -181,8 +185,6 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, if (r->wal_mode == WAL_FSYNC) (void) strcat(r->wal_dir.open_wflags, "s"); - r->rows_per_wal = rows_per_wal; - vclock_create(&r->vclock); xdir_scan(&r->snap_dir); @@ -214,6 +216,14 @@ recovery_update_io_rate_limit(struct recovery_state *r, double new_limit) r->snap_io_rate_limit = UINT64_MAX; } +void +recovery_setup_panic(struct recovery_state *r, bool on_snap_error, + bool on_wal_error) +{ + r->wal_dir.panic_if_error = on_wal_error; + r->snap_dir.panic_if_error = on_snap_error; +} + static inline void recovery_close_log(struct recovery_state *r) { @@ -243,24 +253,13 @@ recovery_delete(struct recovery_state *r) } void -recovery_setup_panic(struct recovery_state *r, bool on_snap_error, - bool on_wal_error) -{ - r->wal_dir.panic_if_error = on_wal_error; - r->snap_dir.panic_if_error = on_snap_error; -} - -void -recovery_process(struct recovery_state *r, struct xrow_header *row) +recovery_apply_row(struct recovery_state *r, struct xrow_header *row) { /* Check lsn */ - int64_t current_signt = vclock_get(&r->vclock, row->server_id); - if (current_signt >=0 && row->lsn <= current_signt) { - say_debug("skipping too young row"); - return; - } - - return r->row_handler(r, r->row_handler_param, row); + int64_t current_lsn = vclock_get(&r->vclock, row->server_id); + assert(current_lsn >= 0); + if (row->lsn > current_lsn) + r->apply_row(r, r->apply_row_param, row); } #define LOG_EOF 0 @@ -283,7 +282,7 @@ recover_xlog(struct recovery_state *r, struct xlog *l) struct xrow_header row; while (xlog_cursor_next(&i, &row) == 0) { try { - recovery_process(r, &row); + recovery_apply_row(r, &row); } catch (ClientError *e) { if (l->dir->panic_if_error) throw; @@ -324,7 +323,7 @@ recovery_bootstrap(struct recovery_state *r) } /** - * Read a snapshot and call row_handler for every snapshot row. + * Read a snapshot and call apply_row for every snapshot row. * Panic in case of error. * * @pre there is an existing snapshot. Otherwise @@ -375,7 +374,7 @@ recover_remaining_wals(struct recovery_state *r) { int result = 0; struct xlog *next_wal; - int64_t current_signt, last_signt; + int64_t current_signature, last_signature; struct vclock *current_vclock; size_t rows_before; enum log_suffix suffix; @@ -388,7 +387,7 @@ recover_remaining_wals(struct recovery_state *r) } current_vclock = vclockset_last(&r->wal_dir.index); - last_signt = current_vclock != NULL ? + last_signature = current_vclock != NULL ? vclock_signature(current_vclock) : -1; /* if the caller already opened WAL for us, recover from it first */ if (r->current_wal != NULL) @@ -399,13 +398,13 @@ recover_remaining_wals(struct recovery_state *r) if (current_vclock == NULL) break; /* No more WALs */ - current_signt = vclock_signature(current_vclock); - if (current_signt == r->signature) { - if (current_signt == last_signt) + current_signature = vclock_signature(current_vclock); + if (current_signature == r->signature) { + if (current_signature == last_signature) break; say_error("missing xlog between %020lld and %020lld", - (long long) current_signt, - (long long) last_signt); + (long long) current_signature, + (long long) last_signature); if (r->wal_dir.panic_if_error) break; @@ -413,9 +412,9 @@ recover_remaining_wals(struct recovery_state *r) say_warn("ignoring missing WALs"); current_vclock = vclockset_next(&r->wal_dir.index, current_vclock); - /* current_signt != last_signt */ + /* current_signature != last_signature */ assert(current_vclock != NULL); - current_signt = vclock_signature(current_vclock); + current_signature = vclock_signature(current_vclock); } /* @@ -424,9 +423,9 @@ recover_remaining_wals(struct recovery_state *r) * .xlog, with no risk of a concurrent * xlog_rename(). */ - suffix = current_signt == last_signt ? INPROGRESS : NONE; + suffix = current_signature == last_signature ? INPROGRESS : NONE; try { - next_wal = xlog_open(&r->wal_dir, current_signt, suffix); + next_wal = xlog_open(&r->wal_dir, current_signature, suffix); } catch (XlogError *e) { e->log(); /* @@ -440,14 +439,14 @@ recover_remaining_wals(struct recovery_state *r) * delete it. */ if (inprogress_log_unlink(format_filename( - &r->wal_dir, current_signt, INPROGRESS)) != 0) + &r->wal_dir, current_signature, INPROGRESS)) != 0) panic("can't unlink 'inprogres' WAL"); } result = 0; break; } assert(r->current_wal == NULL); - r->signature = current_signt; + r->signature = current_signature; r->current_wal = next_wal; say_info("recover from `%s'", r->current_wal->filename); @@ -476,7 +475,7 @@ recover_remaining_wals(struct recovery_state *r) say_info("done `%s'", r->current_wal->filename); recovery_close_log(r); /* goto find_next_wal; */ - } else if (r->signature == last_signt) { + } else if (r->signature == last_signature) { /* last file is not finished */ break; } else if (r->finalize && r->current_wal->is_inprogress) { @@ -503,7 +502,7 @@ recover_remaining_wals(struct recovery_state *r) * It's not a fatal error when last WAL is empty, but if * we lose some logs it is a fatal error. */ - if (last_signt > r->signature) { + if (last_signature > r->signature) { say_error("not all WALs have been successfully read"); result = -1; } @@ -513,7 +512,7 @@ recover_remaining_wals(struct recovery_state *r) } void -recovery_finalize(struct recovery_state *r) +recovery_finalize(struct recovery_state *r, int rows_per_wal) { int result; @@ -551,7 +550,7 @@ recovery_finalize(struct recovery_state *r) recovery_close_log(r); } - wal_writer_start(r); + wal_writer_start(r, rows_per_wal); } @@ -704,6 +703,7 @@ struct wal_writer pthread_mutex_t mutex; pthread_cond_t cond; ev_async write_event; + int rows_per_wal; struct fio_batch *batch; bool is_shutdown; bool is_rollback; @@ -796,7 +796,8 @@ wal_schedule(ev_loop * /* loop */, ev_async *watcher, int /* event */) * more writers in the future. */ static void -wal_writer_init(struct wal_writer *writer, struct vclock *vclock) +wal_writer_init(struct wal_writer *writer, struct vclock *vclock, + int rows_per_wal) { /* I. Initialize the state. */ pthread_mutexattr_t errorcheck; @@ -818,6 +819,7 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock) ev_async_init(&writer->write_event, wal_schedule); writer->write_event.data = writer; writer->txn_loop = loop(); + writer->rows_per_wal = rows_per_wal; writer->batch = fio_batch_alloc(sysconf(_SC_IOV_MAX)); @@ -854,7 +856,7 @@ static void *wal_writer_thread(void *worker_args); * points to a newly created WAL writer. */ static int -wal_writer_start(struct recovery_state *r) +wal_writer_start(struct recovery_state *r, int rows_per_wal) { assert(r->writer == NULL); assert(r->watcher == NULL); @@ -864,7 +866,7 @@ wal_writer_start(struct recovery_state *r) assert(STAILQ_EMPTY(&wal_writer.commit)); /* I. Initialize the state. */ - wal_writer_init(&wal_writer, &r->vclock); + wal_writer_init(&wal_writer, &r->vclock, rows_per_wal); r->writer = &wal_writer; ev_async_start(wal_writer.txn_loop, &wal_writer.write_event); @@ -942,7 +944,7 @@ wal_opt_rotate(struct xlog **wal, struct recovery_state *r, ERROR_INJECT_RETURN(ERRINJ_WAL_ROTATE); - if (l != NULL && l->rows >= r->rows_per_wal) { + if (l != NULL && l->rows >= r->writer->rows_per_wal) { /* * if l->rows == 1, xlog_close() does * xlog_rename() for us. @@ -1033,7 +1035,7 @@ wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer, if (wal_opt_rotate(wal, r, &writer->vclock) != 0) break; struct wal_write_request *batch_end; - batch_end = wal_fill_batch(*wal, batch, r->rows_per_wal, + batch_end = wal_fill_batch(*wal, batch, writer->rows_per_wal, req); write_end = wal_write_batch(*wal, batch, req, batch_end, &writer->vclock); @@ -1151,7 +1153,7 @@ snapshot_write_row(struct recovery_state *r, struct xlog *l, * This makes streaming such rows to a replica or * to recovery look similar to streaming a normal * WAL. @sa the place which skips old rows in - * recovery_process(). + * recovery_apply_row(). */ row->lsn = ++l->rows; row->sync = 0; /* don't write sync to wal */ @@ -1213,9 +1215,8 @@ snapshot_write_row(struct recovery_state *r, struct xlog *l, } void -snapshot_save(struct recovery_state *r) +snapshot_save(struct recovery_state *r, snapshot_f snapshot_handler) { - assert(r->snapshot_handler != NULL); struct xlog *snap = xlog_create(&r->snap_dir, &r->vclock); if (snap == NULL) panic_status(errno, "Failed to save snapshot: failed to open file in write mode."); @@ -1226,7 +1227,7 @@ snapshot_save(struct recovery_state *r) */ say_info("saving snapshot `%s'", snap->filename); - r->snapshot_handler(snap); + snapshot_handler(snap); xlog_close(snap); diff --git a/src/box/recovery.h b/src/box/recovery.h index 3ab0e93456e3294fe2dce26b55aa96f4e5b29741..76fac6b3e882429db9be82d950edb0d915fe755b 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -46,9 +46,8 @@ struct fiber; struct tbuf; struct recovery_state; -typedef void (row_handler)(struct recovery_state *, void *, +typedef void (apply_row_f)(struct recovery_state *, void *, struct xrow_header *packet); -typedef void (snapshot_handler)(struct xlog *); /** A "condition variable" that allows fibers to wait when a given * LSN makes it to disk. @@ -108,17 +107,12 @@ struct recovery_state { struct relay relay; }; /** - * row_handler is a module callback invoked during initial - * recovery and when reading rows from the master. It is - * presented with the most recent format of data. - * row_reader is responsible for converting data from old - * formats. + * apply_row is a module callback invoked during initial + * recovery and when reading rows from the master. */ - row_handler *row_handler; - void *row_handler_param; - snapshot_handler *snapshot_handler; + apply_row_f *apply_row; + void *apply_row_param; uint64_t snap_io_rate_limit; - int rows_per_wal; enum wal_mode wal_mode; struct tt_uuid server_uuid; uint32_t server_id; @@ -128,9 +122,7 @@ struct recovery_state { struct recovery_state * recovery_new(const char *snap_dirname, const char *wal_dirname, - row_handler row_handler, void *row_handler_param, - snapshot_handler snapshot_handler, - int rows_per_wal); + apply_row_f apply_row, void *apply_row_param); void recovery_delete(struct recovery_state *r); @@ -151,19 +143,23 @@ recovery_has_data(struct recovery_state *r) void recovery_bootstrap(struct recovery_state *r); void recover_snap(struct recovery_state *r); void recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay); -void recovery_finalize(struct recovery_state *r); +void recovery_finalize(struct recovery_state *r, int rows_per_wal); int wal_write(struct recovery_state *r, struct xrow_header *packet); void recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_error); -void recovery_process(struct recovery_state *r, struct xrow_header *packet); +void recovery_apply_row(struct recovery_state *r, struct xrow_header *packet); struct fio_batch; void snapshot_write_row(struct recovery_state *r, struct xlog *l, struct xrow_header *packet); -void snapshot_save(struct recovery_state *r); + +typedef void (snapshot_f)(struct xlog *); + +void +snapshot_save(struct recovery_state *r, snapshot_f snapshot_handler); #if defined(__cplusplus) } /* extern "C" */ diff --git a/src/box/replica.cc b/src/box/replica.cc index 7c17abf141ef743bad9a4eb8e9b7331e46e3b56a..70757da09c48ffe6c70950988035a1f3e7703798 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -189,7 +189,7 @@ replica_bootstrap(struct recovery_state *r) break; } else if (iproto_type_is_dml(row.type)) { /* Regular snapshot row (IPROTO_INSERT) */ - recovery_process(r, &row); + recovery_apply_row(r, &row); } else /* error or unexpected packet */ { xrow_decode_error(&row); /* rethrow error */ } @@ -264,7 +264,7 @@ pull_from_remote(va_list ap) if (iproto_type_is_error(row.type)) xrow_decode_error(&row); /* error */ - recovery_process(r, &row); + recovery_apply_row(r, &row); iobuf_reset(iobuf); fiber_gc(); diff --git a/src/box/replication.cc b/src/box/replication.cc index 34c0b46f7b67d4734d69504c739dfe453c9c6f0d..a451c50484c34962625082a29c19748511a1ab48 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -234,8 +234,7 @@ replication_join(int fd, struct xrow_header *packet) { struct recovery_state *r; r = recovery_new(cfg_snap_dir, cfg_wal_dir, - replication_relay_send_row, - NULL, NULL, INT32_MAX); + replication_relay_send_row, NULL); auto recovery_guard = make_scoped_guard([&]{ recovery_delete(r); }); @@ -753,8 +752,7 @@ replication_relay_loop(struct relay *relay) struct recovery_state *r = NULL; try { r = recovery_new(cfg_snap_dir, cfg_wal_dir, - replication_relay_send_row, - NULL, NULL, INT32_MAX); + replication_relay_send_row, NULL); r->relay = *relay; /* copy relay state to recovery */ assert(r->relay.type == IPROTO_SUBSCRIBE);