diff --git a/src/box/applier.cc b/src/box/applier.cc index 633bd4d009648892ccd49465dbbfbc372efe2fe5..33f291050f8a1f208f7f50e5265a22d36ad00e77 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -157,7 +157,7 @@ applier_check_sync(struct applier *applier) if (applier->state == APPLIER_SYNC && applier->lag <= replication_sync_lag && vclock_compare_ignore0(&applier->remote_vclock_at_subscribe, - &replicaset.vclock) <= 0) { + instance_vclock) <= 0) { /* Applier is synced, switch to "follow". */ applier_set_state(applier, APPLIER_FOLLOW); } @@ -788,7 +788,7 @@ applier_wait_snapshot(struct applier *applier) * Used to initialize the replica's initial * vclock in bootstrap_from_master() */ - xrow_decode_vclock_xc(&row, &replicaset.vclock); + xrow_decode_vclock_xc(&row, instance_vclock); } coio_read_xrow(io, ibuf, &row); @@ -844,7 +844,7 @@ applier_wait_snapshot(struct applier *applier) * vclock yet, do it now. In 1.7+ * this vclock is not used. */ - xrow_decode_vclock_xc(&row, &replicaset.vclock); + xrow_decode_vclock_xc(&row, instance_vclock); } break; /* end of stream */ } else if (iproto_type_is_error(row.type)) { @@ -1239,7 +1239,7 @@ applier_rollback_by_wal_io(int64_t signature) trigger_run(&replicaset.applier.on_rollback, NULL); /* Rollback applier vclock to the committed one. */ - vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); + vclock_copy(&replicaset.applier.vclock, instance_vclock); } static int @@ -1527,7 +1527,7 @@ apply_final_join_tx(uint32_t replica_id, struct stailq *rows) &stailq_last_entry(rows, struct applier_tx_row, next)->row; int rc = 0; /* WAL isn't enabled yet, so follow vclock manually. */ - vclock_follow_xrow(&replicaset.vclock, last_row); + vclock_follow_xrow(instance_vclock, last_row); if (unlikely(iproto_type_is_synchro_request(txr->row.type))) { rc = apply_synchro_req(replica_id, &txr->row, &txr->req.synchro); @@ -1758,7 +1758,7 @@ applier_signal_ack(struct applier *applier) applier->txn_last_tm = 0; applier->ack_msg.vclock_sync = applier->last_vclock_sync; applier->ack_msg.term = box_raft()->term; - vclock_copy(&applier->ack_msg.vclock, &replicaset.vclock); + vclock_copy(&applier->ack_msg.vclock, instance_vclock); cmsg_init(&applier->ack_msg.base, applier->ack_route); cpipe_push(&applier->applier_thread->thread_pipe, &applier->ack_msg.base); @@ -2390,7 +2390,7 @@ applier_subscribe(struct applier *applier) struct subscribe_request req; memset(&req, 0, sizeof(req)); - vclock_copy(&req.vclock, &replicaset.vclock); + vclock_copy(&req.vclock, instance_vclock); ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK, { vclock_create(&req.vclock); }); diff --git a/src/box/box.cc b/src/box/box.cc index b10c6cbb254e069d6999a21c7a94037e74e3ca8e..983be84baaf82f488c39c3a904d6338ad4b3c46b 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -103,6 +103,15 @@ static char status[64] = "unconfigured"; +/** + * Storage for WAL vclock. Not in-memory but specifically for what is the + * vclock of the end of WAL. + * + * It is encapsulated inside this file to protect it from any illegal changes + * by outside code. + */ +static struct vclock instance_vclock_storage; + /** box.stat rmean */ struct rmean *rmean_box; @@ -115,7 +124,8 @@ struct rlist box_on_shutdown_trigger_list = struct event *box_on_shutdown_event = NULL; -const struct vclock *box_vclock = &replicaset.vclock; +const struct vclock *box_vclock = instance_vclock; +struct vclock *instance_vclock = &instance_vclock_storage; const char *box_auth_type; @@ -324,10 +334,10 @@ box_broadcast_ballot_on_timeout(ev_loop *loop, ev_timer *timer, int events) (void)timer; (void)events; static struct vclock broadcast_vclock; - if (vclock_compare_ignore0(&broadcast_vclock, &replicaset.vclock) == 0) + if (vclock_compare_ignore0(&broadcast_vclock, instance_vclock) == 0) return; box_broadcast_ballot(); - vclock_copy(&broadcast_vclock, &replicaset.vclock); + vclock_copy(&broadcast_vclock, instance_vclock); } /** @@ -2659,7 +2669,7 @@ box_collect_confirmed_vclock(struct vclock *confirmed_vclock, double deadline) * We should check the vclock on self plus vclock_count - 1 remote * instances. */ - vclock_copy(confirmed_vclock, &replicaset.vclock); + vclock_copy(confirmed_vclock, instance_vclock); if (vclock_count <= 1) return 0; @@ -2739,7 +2749,7 @@ box_wait_vclock_f(struct trigger *trigger, void *event) (void)event; struct box_wait_vclock_data *data = (struct box_wait_vclock_data *)trigger->data; - if (vclock_compare_ignore0(data->vclock, &replicaset.vclock) <= 0) { + if (vclock_compare_ignore0(data->vclock, instance_vclock) <= 0) { data->is_ready = true; fiber_wakeup(data->waiter); } @@ -2753,7 +2763,7 @@ box_wait_vclock_f(struct trigger *trigger, void *event) static int box_wait_vclock(const struct vclock *vclock, double deadline) { - if (vclock_compare_ignore0(vclock, &replicaset.vclock) <= 0) + if (vclock_compare_ignore0(vclock, instance_vclock) <= 0) return 0; struct trigger on_wal_write; struct box_wait_vclock_data data = { @@ -4319,7 +4329,7 @@ box_process_fetch_snapshot(struct iostream *io, /* Remember master's vclock after the last request */ struct vclock stop_vclock; - vclock_copy(&stop_vclock, &replicaset.vclock); + vclock_copy(&stop_vclock, instance_vclock); /* Send end of snapshot data marker */ struct xrow_header row; @@ -4346,7 +4356,7 @@ static void box_localize_vclock(const struct vclock *remote, struct vclock *local) { vclock_copy(local, remote); - vclock_reset(local, 0, vclock_get(&replicaset.vclock, 0)); + vclock_reset(local, 0, vclock_get(instance_vclock, 0)); } void @@ -4424,7 +4434,7 @@ box_process_register(struct iostream *io, const struct xrow_header *header) tnt_raise(ClientError, ER_CANNOT_REGISTER); /* Remember master's vclock after the last request */ struct vclock stop_vclock; - vclock_copy(&stop_vclock, &replicaset.vclock); + vclock_copy(&stop_vclock, instance_vclock); /* * Feed replica with WALs up to the REGISTER itself so that it gets own * registration entry. @@ -4436,7 +4446,7 @@ box_process_register(struct iostream *io, const struct xrow_header *header) RegionGuard region_guard(&fiber()->gc); struct xrow_header row; /* Send end of WAL stream marker */ - xrow_encode_vclock(&row, &replicaset.vclock); + xrow_encode_vclock(&row, instance_vclock); row.sync = header->sync; coio_write_xrow(io, &row); @@ -4552,8 +4562,8 @@ box_process_join(struct iostream *io, const struct xrow_header *header) * Register the replica as a WAL consumer so that * it can resume FINAL JOIN where INITIAL JOIN ends. */ - struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock, - "replica %s", tt_uuid_str(&req.instance_uuid)); + struct gc_consumer *gc = gc_consumer_register( + instance_vclock, "replica %s", tt_uuid_str(&req.instance_uuid)); if (gc == NULL) diag_raise(); auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); }); @@ -4581,7 +4591,7 @@ box_process_join(struct iostream *io, const struct xrow_header *header) /* Remember master's vclock after the last request */ struct vclock stop_vclock; - vclock_copy(&stop_vclock, &replicaset.vclock); + vclock_copy(&stop_vclock, instance_vclock); /* Send end of initial stage data marker */ struct xrow_header row; RegionGuard region_guard(&fiber()->gc); @@ -4598,7 +4608,7 @@ box_process_join(struct iostream *io, const struct xrow_header *header) say_info("final data sent."); /* Send end of WAL stream marker */ - xrow_encode_vclock(&row, &replicaset.vclock); + xrow_encode_vclock(&row, instance_vclock); row.sync = header->sync; coio_write_xrow(io, &row); @@ -4748,7 +4758,7 @@ box_process_subscribe(struct iostream *io, const struct xrow_header *header) */ struct subscribe_response rsp; memset(&rsp, 0, sizeof(rsp)); - vclock_copy(&rsp.vclock, &replicaset.vclock); + vclock_copy(&rsp.vclock, instance_vclock); rsp.replicaset_uuid = REPLICASET_UUID; strlcpy(rsp.replicaset_name, REPLICASET_NAME, NODE_NAME_SIZE_MAX); struct xrow_header row; @@ -4810,7 +4820,7 @@ box_process_vote(struct ballot *ballot) ballot->is_anon = cfg_replication_anon; ballot->is_ro = is_ro_summary; ballot->is_booted = is_box_configured; - vclock_copy(&ballot->vclock, &replicaset.vclock); + vclock_copy(&ballot->vclock, instance_vclock); vclock_copy(&ballot->gc_vclock, &gc.vclock); ballot->bootstrap_leader_uuid = bootstrap_leader_uuid; if (*INSTANCE_NAME != '\0') { @@ -5119,7 +5129,7 @@ bootstrap_from_master(struct replica *master) * Process final data (WALs). */ engine_begin_final_recovery_xc(); - recovery_journal_create(&replicaset.vclock); + recovery_journal_create(&instance_vclock_storage); if (!cfg_replication_anon) { applier_resume_to_state(applier, APPLIER_JOINED, @@ -5278,7 +5288,7 @@ local_recovery(const struct vclock *checkpoint_vclock) */ box_vclock = &recovery->vclock; auto guard = make_scoped_guard([&]{ - box_vclock = &replicaset.vclock; + box_vclock = instance_vclock; recovery_delete(recovery); }); @@ -5288,10 +5298,10 @@ local_recovery(const struct vclock *checkpoint_vclock) * so we must reflect this in replicaset vclock to * not attempt to apply these rows twice. */ - recovery_scan(recovery, &replicaset.vclock, &gc.vclock, + recovery_scan(recovery, &instance_vclock_storage, &gc.vclock, &wal_stream.base); box_broadcast_ballot(); - say_info("instance vclock %s", vclock_to_string(&replicaset.vclock)); + say_info("instance vclock %s", vclock_to_string(instance_vclock)); if (wal_dir_lock >= 0) { if (box_listen() != 0) @@ -5394,11 +5404,11 @@ local_recovery(const struct vclock *checkpoint_vclock) * Advance replica set vclock to reflect records * applied in hot standby mode. */ - vclock_copy(&replicaset.vclock, &recovery->vclock); + vclock_copy(&instance_vclock_storage, &recovery->vclock); if (box_listen() != 0) diag_raise(); box_update_replication(); - } else if (vclock_compare(&replicaset.vclock, &recovery->vclock) != 0) { + } else if (vclock_compare(instance_vclock, &recovery->vclock) != 0) { /* * There are several reasons for a node to recover a vclock not * matching the one scanned initially: @@ -5424,12 +5434,13 @@ local_recovery(const struct vclock *checkpoint_vclock) const char *mismatch_str = tt_sprintf("Replicaset vclock %s doesn't match " "recovered data %s", - vclock_to_string(&replicaset.vclock), + vclock_to_string(instance_vclock), vclock_to_string(&recovery->vclock)); if (box_is_force_recovery) { say_warn("%s: ignoring, because 'force_recovery' " "configuration option is set.", mismatch_str); - vclock_copy(&replicaset.vclock, &recovery->vclock); + vclock_copy(&instance_vclock_storage, + &recovery->vclock); } else { panic("Can't proceed. %s.", mismatch_str); } @@ -5580,7 +5591,7 @@ box_cfg_xc(void) * replicaset.applier.vclock is filled with real * value where local restore has already completed */ - vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); + vclock_copy(&replicaset.applier.vclock, instance_vclock); /* * Exclude self from GC delay because we care @@ -5629,7 +5640,7 @@ box_cfg_xc(void) struct raft *raft = box_raft(); if (!cfg_replication_anon) raft_cfg_instance_id(raft, instance_id); - raft_cfg_vclock(raft, &replicaset.vclock); + raft_cfg_vclock(raft, instance_vclock); if (box_set_election_timeout() != 0) diag_raise(); @@ -6007,7 +6018,7 @@ box_storage_init(void) double wal_retention_period = box_check_wal_retention_period_xc(); if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size, wal_retention_period, &INSTANCE_UUID, - on_wal_garbage_collection, + &instance_vclock_storage, on_wal_garbage_collection, on_wal_checkpoint_threshold) != 0) { diag_raise(); } diff --git a/src/box/box.h b/src/box/box.h index da890a42f16fa380b5aa867ed6c6ef469938d3b6..b73f78e7f5a21f52d2b6daecd4e99f55434f6b4e 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -59,12 +59,18 @@ struct ballot; * Pointer to TX thread local vclock. * * During recovery it points to the current recovery position. - * Once recovery is complete, it is set to &replicaset.vclock. + * Once recovery is complete, it is set to instance_vclock. * * We need it for reporting the actual vclock in box.info while * the instance is in hot standby mode. */ extern const struct vclock *box_vclock; +/** + * Vclock of the instance as it is stored on disk. It means that during + * recovery this vclock points to the end of WAL, not to the current recovery + * position. + */ +extern struct vclock *instance_vclock; /** * Name of the authentication method that is currently used on diff --git a/src/box/iproto.cc b/src/box/iproto.cc index a3245d668ccb1ab7c4205c9155ddbfd5dee6fc90..78c099a8757f4854793c6b70e8da3074ed9affe3 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -2737,7 +2737,7 @@ tx_process_misc(struct cmsg *m) ::schema_version); break; case IPROTO_VOTE_DEPRECATED: - iproto_reply_vclock(out, &replicaset.vclock, msg->header.sync, + iproto_reply_vclock(out, instance_vclock, msg->header.sync, ::schema_version); break; case IPROTO_VOTE: diff --git a/src/box/lua/info.c b/src/box/lua/info.c index b9fb2f120a99df2b0059c0b5cb1344df34576f04..1ce65fe404e85a3896473959003ab46b91dacafa 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -198,7 +198,7 @@ lbox_pushreplica(lua_State *L, struct replica *replica) lua_setfield(L, -2, "name"); lua_pushstring(L, "lsn"); - luaL_pushuint64(L, vclock_get(&replicaset.vclock, replica->id)); + luaL_pushuint64(L, vclock_get(instance_vclock, replica->id)); lua_settable(L, -3); if (applier != NULL && applier->state != APPLIER_OFF) { diff --git a/src/box/relay.cc b/src/box/relay.cc index 813fca94158c75f59e835cc1724037b4cca3ea39..6fced997117f1cb6937a1ba52b1f9f0bd3e1f1c0 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -57,6 +57,7 @@ #include "wal.h" #include "txn_limbo.h" #include "raft.h" +#include "box.h" #include <stdlib.h> @@ -567,7 +568,7 @@ relay_final_join(struct replica *replica, struct iostream *io, uint64_t sync, tnt_raise(ClientError, ER_INJECTION, "relay final join")); ERROR_INJECT(ERRINJ_RELAY_FINAL_SLEEP, { - while (vclock_compare(stop_vclock, &replicaset.vclock) == 0) + while (vclock_compare(stop_vclock, instance_vclock) == 0) fiber_sleep(0.001); }); } @@ -1110,7 +1111,7 @@ relay_subscribe(struct replica *replica, struct iostream *io, uint64_t sync, replica_on_relay_stop(replica); }); - vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); + vclock_copy(&relay->local_vclock_at_subscribe, instance_vclock); /* * Save the first vclock as 'received'. Because it was really received. */ diff --git a/src/box/replication.cc b/src/box/replication.cc index ef10830bd6d161e10c4eb25b42089969db507fd6..dbbebe60408baace599ab978069b1cfaa7d73f6b 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -204,12 +204,11 @@ replication_init(int num_threads) memset(&replicaset, 0, sizeof(replicaset)); replica_hash_new(&replicaset.hash); rlist_create(&replicaset.anon); - vclock_create(&replicaset.vclock); fiber_cond_create(&replicaset.applier.cond); latch_create(&replicaset.applier.order_latch); vclock_create(&replicaset.applier.vclock); - vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); + vclock_copy(&replicaset.applier.vclock, instance_vclock); rlist_create(&replicaset.applier.on_rollback); rlist_create(&replicaset.applier.on_wal_write); @@ -366,7 +365,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id) * anonymous to a normal one. */ assert(replica->gc == NULL); - replica->gc = gc_consumer_register(&replicaset.vclock, + replica->gc = gc_consumer_register(instance_vclock, "replica %s", tt_uuid_str(&replica->uuid)); } @@ -1205,8 +1204,7 @@ replicaset_needs_rejoin(struct replica **master) continue; const struct ballot *ballot = &applier->ballot; - if (vclock_compare(&ballot->gc_vclock, - &replicaset.vclock) <= 0) { + if (vclock_compare(&ballot->gc_vclock, instance_vclock) <= 0) { /* * There's at least one master that still stores * WALs needed by this instance. Proceed to local @@ -1218,14 +1216,15 @@ replicaset_needs_rejoin(struct replica **master) const char *uuid_str = tt_uuid_str(&replica->uuid); const char *addr_str = sio_strfaddr(&applier->addr, applier->addr_len); - const char *local_vclock_str = vclock_to_string(&replicaset.vclock); + const char *local_vclock_str = + vclock_to_string(instance_vclock); const char *remote_vclock_str = vclock_to_string(&ballot->vclock); const char *gc_vclock_str = vclock_to_string(&ballot->gc_vclock); say_info("can't follow %s at %s: required %s available %s", uuid_str, addr_str, local_vclock_str, gc_vclock_str); - if (vclock_compare(&replicaset.vclock, &ballot->vclock) > 0) { + if (vclock_compare(instance_vclock, &ballot->vclock) > 0) { /* * Replica has some rows that are not present on * the master. Don't rebootstrap as we don't want diff --git a/src/box/replication.h b/src/box/replication.h index 82a54976e47144af08b112dfe3a60a0071f876d5..6a9aaccce4191c23aabca9d7ab9be21b42edf92a 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -294,11 +294,6 @@ struct replicaset { * to connect and those that failed to connect. */ struct rlist anon; - /** - * TX thread local vclock reflecting the state - * of the cluster as maintained by appliers. - */ - struct vclock vclock; /** * This flag is set while the instance is bootstrapping * from a remote master. diff --git a/src/box/wal.c b/src/box/wal.c index b2f5c8e4e4e6acfd82ea880925e72788c82cf466..d5a3c800301a44d9b3a92eb3d8ce0a1f5295c248 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -104,6 +104,8 @@ struct wal_writer struct cpipe wal_pipe; /** A memory pool for messages. */ struct mempool msg_pool; + /** Vclock to propagate on write. */ + struct vclock *instance_vclock; /** * A last journal entry submitted to write. This is a * 'rollback border'. When rollback starts, all @@ -376,7 +378,7 @@ tx_complete_batch(struct cmsg *msg) tx_complete_rollback(); } /* Update the tx vclock to the latest written by wal. */ - vclock_copy(&replicaset.vclock, &batch->vclock); + vclock_copy(writer->instance_vclock, &batch->vclock); tx_schedule_queue(&batch->commit); trigger_run(&wal_on_write, NULL); mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base)); @@ -421,9 +423,11 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_size, double wal_retention_period, const struct tt_uuid *instance_uuid, + struct vclock *instance_vclock, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { + writer->instance_vclock = instance_vclock; writer->wal_mode = wal_mode; writer->wal_max_size = wal_max_size; @@ -547,13 +551,14 @@ int wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_size, double wal_retention_period, const struct tt_uuid *instance_uuid, + struct vclock *instance_vclock, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { /* Initialize the state. */ struct wal_writer *writer = &wal_writer_singleton; wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size, - wal_retention_period, instance_uuid, + wal_retention_period, instance_uuid, instance_vclock, on_garbage_collection, on_checkpoint_threshold); /* Start WAL thread. */ @@ -572,7 +577,7 @@ wal_enable(void) struct wal_writer *writer = &wal_writer_singleton; /* Initialize the writer vclock from the recovery state. */ - vclock_copy(&writer->vclock, &replicaset.vclock); + vclock_copy(&writer->vclock, writer->instance_vclock); /* * Scan the WAL directory to build an index of all @@ -1233,7 +1238,7 @@ wal_write_to_disk(struct cmsg *msg) } /* * Remember the vclock of the last successfully written row so - * that we can update replicaset.vclock once this message gets + * that we can update instance_vclock once this message gets * back to tx. */ vclock_copy(&wal_msg->vclock, &writer->vclock); @@ -1415,7 +1420,7 @@ wal_write_none_async(struct journal *journal, vclock_create(&vclock_diff); wal_assign_lsn(&vclock_diff, &writer->vclock, entry); vclock_merge(&writer->vclock, &vclock_diff); - vclock_copy(&replicaset.vclock, &writer->vclock); + vclock_copy(writer->instance_vclock, &writer->vclock); entry->res = vclock_sum(&writer->vclock); journal_async_complete(entry); return 0; diff --git a/src/box/wal.h b/src/box/wal.h index 81e9831574389c47fe9ab6cee9a638a002364834..8ce194b6d185650a0d33532f79422a704799daa5 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -109,6 +109,7 @@ int wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_size, double wal_retention_period, const struct tt_uuid *instance_uuid, + struct vclock *instance_vclock, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold);