From 67b60f08d8a937c30f0ab7ee52a56c260d9d8dbc Mon Sep 17 00:00:00 2001 From: sergepetrenko <sergepetrenko@tarantool.org> Date: Tue, 18 Aug 2020 14:49:14 +0300 Subject: [PATCH] raft: relay status updates to followers The patch introduces a new type of system message used to notify the followers of the instance's raft status updates. It's relay's responsibility to deliver the new system rows to its peers. The notification system reuses and extends the same row type used to persist raft state in WAL and snapshot. Part of #1146 Part of #5204 --- src/box/applier.cc | 28 ++++++- src/box/box.cc | 17 ++++- src/box/iproto_constants.h | 2 + src/box/memtx_engine.c | 3 +- src/box/raft.c | 73 +++++++++++++++++- src/box/raft.h | 35 ++++++++- src/box/relay.cc | 150 ++++++++++++++++++++++++++++++++++++- src/box/relay.h | 7 ++ src/box/xrow.c | 87 +++++++++++++++++---- src/box/xrow.h | 5 +- 10 files changed, 382 insertions(+), 25 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 1a0b55640a..9fed3c071f 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -55,6 +55,7 @@ #include "scoped_guard.h" #include "txn_limbo.h" #include "journal.h" +#include "raft.h" STRS(applier_state, applier_STATE); @@ -878,6 +879,18 @@ apply_synchro_row(struct xrow_header *row) return -1; } +static int +applier_handle_raft(struct applier *applier, struct xrow_header *row) +{ + assert(iproto_type_is_raft_request(row->type)); + + struct raft_request req; + struct vclock candidate_clock; + if (xrow_decode_raft(row, &req, &candidate_clock) != 0) + return -1; + return raft_process_msg(&req, applier->instance_id); +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -1222,11 +1235,20 @@ applier_subscribe(struct applier *applier) * In case of an heartbeat message wake a writer up * and check applier state. */ - if (stailq_first_entry(&rows, struct applier_tx_row, - next)->row.lsn == 0) + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + if (first_row->lsn == 0) { + if (unlikely(iproto_type_is_raft_request( + first_row->type))) { + if (applier_handle_raft(applier, + first_row) != 0) + diag_raise(); + } applier_signal_ack(applier); - else if (applier_apply_tx(&rows) != 0) + } else if (applier_apply_tx(&rows) != 0) { diag_raise(); + } if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); diff --git a/src/box/box.cc b/src/box/box.cc index 99a15bfd07..a8542cb38e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) } if (iproto_type_is_raft_request(row->type)) { struct raft_request raft_req; - if (xrow_decode_raft(row, &raft_req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &raft_req, NULL) != 0) diag_raise(); raft_process_recovery(&raft_req); return; @@ -2146,7 +2147,19 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); say_info("remote vclock %s local vclock %s", vclock_to_string(&replica_clock), vclock_to_string(&vclock)); - + if (raft_is_enabled()) { + /* + * Send out the current raft state of the instance. Don't do + * that if Raft is disabled. It can be that a part of the + * cluster still contains old versions, which can't handle Raft + * messages. So when it is disabled, its network footprint + * should be 0. + */ + struct raft_request req; + raft_serialize_for_network(&req, &vclock); + xrow_encode_raft(&row, &fiber()->gc, &req); + coio_write_xrow(io, &row); + } /* * Replica clock is used in gc state and recovery * initialization, so we need to replace the remote 0-th diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index f9347f5550..d3738c7050 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -264,6 +264,8 @@ extern const char *iproto_type_strs[]; enum iproto_raft_keys { IPROTO_RAFT_TERM = 0, IPROTO_RAFT_VOTE = 1, + IPROTO_RAFT_STATE = 2, + IPROTO_RAFT_VCLOCK = 3, }; /** diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 2f38c2647f..8147557f68 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -207,7 +207,8 @@ memtx_engine_recover_raft(const struct xrow_header *row) { assert(row->type == IPROTO_RAFT); struct raft_request req; - if (xrow_decode_raft(row, &req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &req, NULL) != 0) return -1; raft_process_recovery(&req); return 0; diff --git a/src/box/raft.c b/src/box/raft.c index ee54d02b77..024433369f 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -34,9 +34,20 @@ #include "journal.h" #include "xrow.h" #include "small/region.h" +#include "replication.h" +#include "relay.h" + +const char *raft_state_strs[] = { + NULL, + "follower", + "candidate", + "leader", +}; /** Raft state of this instance. */ struct raft raft = { + .leader = 0, + .state = RAFT_STATE_FOLLOWER, .is_enabled = false, .is_candidate = false, .term = 1, @@ -50,18 +61,71 @@ raft_process_recovery(const struct raft_request *req) raft.term = req->term; if (req->vote != 0) raft.vote = req->vote; + /* + * Role is never persisted. If recovery is happening, the + * node was restarted, and the former role can be false + * anyway. + */ + assert(req->state == 0); + /* + * Vclock is always persisted by some other subsystem - WAL, snapshot. + * It is used only to decide to whom to give the vote during election, + * as a part of the volatile state. + */ + assert(req->vclock == NULL); + /* Raft is not enabled until recovery is finished. */ + assert(!raft_is_enabled()); +} + +int +raft_process_msg(const struct raft_request *req, uint32_t source) +{ + (void)source; + if (req->term > raft.term) { + // Update term. + // The logic will be similar, but the code + // below is for testing purposes. + raft.term = req->term; + } + if (req->vote > 0) { + // Check whether the vote's for us. + } + switch (req->state) { + case RAFT_STATE_FOLLOWER: + break; + case RAFT_STATE_CANDIDATE: + // Perform voting logic. + break; + case RAFT_STATE_LEADER: + // Switch to a new leader. + break; + default: + break; + } + return 0; } void -raft_serialize_for_network(struct raft_request *req) +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock) { + memset(req, 0, sizeof(*req)); req->term = raft.term; req->vote = raft.vote; + req->state = raft.state; + /* + * Raft does not own vclock, so it always expects it passed externally. + * Vclock is sent out only by candidate instances. + */ + if (req->state == RAFT_STATE_CANDIDATE) { + req->vclock = vclock; + vclock_copy(vclock, &replicaset.vclock); + } } void raft_serialize_for_disk(struct raft_request *req) { + memset(req, 0, sizeof(*req)); req->term = raft.term; req->vote = raft.vote; } @@ -93,3 +157,10 @@ void raft_cfg_death_timeout(void) { } + +void +raft_broadcast(const struct raft_request *req) +{ + replicaset_foreach(replica) + relay_push_raft(replica->relay, req); +} diff --git a/src/box/raft.h b/src/box/raft.h index f272227528..8abde4f4c2 100644 --- a/src/box/raft.h +++ b/src/box/raft.h @@ -37,8 +37,19 @@ extern "C" { #endif struct raft_request; +struct vclock; + +enum raft_state { + RAFT_STATE_FOLLOWER = 1, + RAFT_STATE_CANDIDATE = 2, + RAFT_STATE_LEADER = 3, +}; + +extern const char *raft_state_strs[]; struct raft { + uint32_t leader; + enum raft_state state; bool is_enabled; bool is_candidate; uint64_t term; @@ -48,10 +59,25 @@ struct raft { extern struct raft raft; +/** Check if Raft is enabled. */ +static inline bool +raft_is_enabled(void) +{ + return raft.is_enabled; +} + /** Process a raft entry stored in WAL/snapshot. */ void raft_process_recovery(const struct raft_request *req); +/** + * Process a raft status message coming from the network. + * @param req Raft request. + * @param source Instance ID of the message sender. + */ +int +raft_process_msg(const struct raft_request *req, uint32_t source); + /** Configure whether Raft is enabled. */ void raft_cfg_is_enabled(bool is_enabled); @@ -88,7 +114,7 @@ raft_cfg_death_timeout(void); * cluster. It is allowed to save anything here, not only persistent state. */ void -raft_serialize_for_network(struct raft_request *req); +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock); /** * Save complete Raft state into a request to be persisted on disk. Only term @@ -97,6 +123,13 @@ raft_serialize_for_network(struct raft_request *req); void raft_serialize_for_disk(struct raft_request *req); +/** + * Broadcast the changes in this instance's raft status to all + * the followers. + */ +void +raft_broadcast(const struct raft_request *req); + #if defined(__cplusplus) } #endif diff --git a/src/box/relay.cc b/src/box/relay.cc index 124b0f52f5..76430caa68 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -53,6 +53,7 @@ #include "xstream.h" #include "wal.h" #include "txn_limbo.h" +#include "raft.h" /** * Cbus message to send status updates from relay to tx thread. @@ -145,6 +146,12 @@ struct relay { alignas(CACHELINE_SIZE) /** Known relay vclock. */ struct vclock vclock; + /** + * True if the relay needs Raft updates. It can live fine + * without sending Raft updates, if it is a relay to an + * anonymous replica, for example. + */ + bool is_raft_enabled; } tx; }; @@ -572,6 +579,74 @@ relay_send_heartbeat(struct relay *relay) } } +/** A message to set Raft enabled flag in TX thread from a relay thread. */ +struct relay_is_raft_enabled_msg { + /** Base cbus message. */ + struct cmsg base; + /** + * First hop - TX thread, second hop - a relay thread, to notify about + * the flag being set. + */ + struct cmsg_hop route[2]; + /** Relay pointer to set the flag in. */ + struct relay *relay; + /** New flag value. */ + bool value; + /** Flag to wait for the flag being set, in a relay thread. */ + bool is_finished; +}; + +/** TX thread part of the Raft flag setting, first hop. */ +static void +tx_set_is_raft_enabled(struct cmsg *base) +{ + struct relay_is_raft_enabled_msg *msg = + (struct relay_is_raft_enabled_msg *)base; + msg->relay->tx.is_raft_enabled = msg->value; +} + +/** Relay thread part of the Raft flag setting, second hop. */ +static void +relay_set_is_raft_enabled(struct cmsg *base) +{ + struct relay_is_raft_enabled_msg *msg = + (struct relay_is_raft_enabled_msg *)base; + msg->is_finished = true; +} + +/** + * Set relay Raft enabled flag from a relay thread to be accessed by the TX + * thread. + */ +static void +relay_send_is_raft_enabled(struct relay *relay, + struct relay_is_raft_enabled_msg *msg, bool value) +{ + msg->route[0].f = tx_set_is_raft_enabled; + msg->route[0].pipe = &relay->relay_pipe; + msg->route[1].f = relay_set_is_raft_enabled; + msg->route[1].pipe = NULL; + msg->relay = relay; + msg->value = value; + msg->is_finished = false; + cmsg_init(&msg->base, msg->route); + cpipe_push(&relay->tx_pipe, &msg->base); + /* + * cbus_call() can't be used, because it works only if the sender thread + * is a simple cbus_process() loop. But the relay thread is not - + * instead it calls cbus_process() manually when ready. And the thread + * loop consists of the main fiber wakeup. So cbus_call() would just + * hang, because cbus_process() wouldn't be called by the scheduler + * fiber. + */ + while (!msg->is_finished) { + cbus_process(&relay->endpoint); + if (msg->is_finished) + break; + fiber_yield(); + } +} + /** * A libev callback invoked when a relay client socket is ready * for read. This currently only happens when the client closes @@ -592,6 +667,10 @@ relay_subscribe_f(va_list ap) cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); + struct relay_is_raft_enabled_msg raft_enabler; + if (!relay->replica->anon) + relay_send_is_raft_enabled(relay, &raft_enabler, true); + /* * Setup garbage collection trigger. * Not needed for anonymous replicas, since they @@ -671,6 +750,9 @@ relay_subscribe_f(va_list ap) cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); } + if (!relay->replica->anon) + relay_send_is_raft_enabled(relay, &raft_enabler, false); + /* * Log the error that caused the relay to break the loop. * Don't clear the error for status reporting. @@ -770,13 +852,75 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } +struct relay_raft_msg { + struct cmsg base; + struct cmsg_hop route; + struct raft_request req; + struct vclock vclock; + struct relay *relay; +}; + +static void +relay_raft_msg_push(struct cmsg *base) +{ + struct relay_raft_msg *msg = (struct relay_raft_msg *)base; + struct xrow_header row; + xrow_encode_raft(&row, &fiber()->gc, &msg->req); + try { + relay_send(msg->relay, &row); + } catch (Exception *e) { + relay_set_error(msg->relay, e); + fiber_cancel(fiber()); + } + free(msg); +} + +void +relay_push_raft(struct relay *relay, const struct raft_request *req) +{ + /* + * Raft updates don't stack. They are thrown away if can't be pushed + * now. This is fine, as long as relay's live much longer that the + * timeouts in Raft are set. + */ + if (!relay->tx.is_raft_enabled) + return; + /* + * XXX: the message should be preallocated. It should + * work like Kharon in IProto. Relay should have 2 raft + * messages rotating. When one is sent, the other can be + * updated and a flag is set. When the first message is + * sent, the control returns to TX thread, sees the set + * flag, rotates the buffers, and sends it again. And so + * on. This is how it can work in future, with 0 heap + * allocations. Current solution with alloc-per-update is + * good enough as a start. Another option - wait until all + * is moved to WAL thread, where this will all happen + * in one thread and will be much simpler. + */ + struct relay_raft_msg *msg = + (struct relay_raft_msg *)malloc(sizeof(*msg)); + if (msg == NULL) { + panic("Couldn't allocate raft message"); + return; + } + msg->req = *req; + if (req->vclock != NULL) { + msg->req.vclock = &msg->vclock; + vclock_copy(&msg->vclock, req->vclock); + } + msg->route.f = relay_raft_msg_push; + msg->route.pipe = NULL; + cmsg_init(&msg->base, &msg->route); + msg->relay = relay; + cpipe_push(&relay->relay_pipe, &msg->base); +} + /** Send a single row to the client. */ static void relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); - assert(iproto_type_is_dml(packet->type) || - iproto_type_is_synchro_request(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other @@ -793,6 +937,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; } + assert(iproto_type_is_dml(packet->type) || + iproto_type_is_synchro_request(packet->type)); /* Check if the rows from the instance are filtered. */ if ((1 << packet->replica_id & relay->id_filter) != 0) return; diff --git a/src/box/relay.h b/src/box/relay.h index 0632fa9127..b32e2ea2ac 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay); double relay_last_row_time(const struct relay *relay); +/** + * Send a Raft update request to the relay channel. It is not + * guaranteed that it will be delivered. The connection may break. + */ +void +relay_push_raft(struct relay *relay, const struct raft_request *req); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/xrow.c b/src/box/xrow.c index b9bbb19a0d..da5c6ffaea 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -962,11 +962,30 @@ int xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_RAFT_TERM) + - mp_sizeof_uint(r->term) + - mp_sizeof_uint(IPROTO_RAFT_VOTE) + - mp_sizeof_uint(r->vote); + /* + * Terms is encoded always. Sometimes the rest can be even ignored if + * the term is too old. + */ + int map_size = 1; + size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) + + mp_sizeof_uint(r->term); + if (r->vote != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VOTE) + + mp_sizeof_uint(r->vote); + } + if (r->state != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_STATE) + + mp_sizeof_uint(r->state); + } + if (r->vclock != NULL) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) + + mp_sizeof_vclock_ignore0(r->vclock); + } + size += mp_sizeof_map(map_size); + char *buf = region_alloc(region, size); if (buf == NULL) { diag_set(OutOfMemory, size, "region_alloc", "buf"); @@ -975,43 +994,83 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, memset(row, 0, sizeof(*row)); row->type = IPROTO_RAFT; row->body[0].iov_base = buf; - row->body[0].iov_len = size; row->group_id = GROUP_LOCAL; row->bodycnt = 1; - buf = mp_encode_map(buf, 2); + const char *begin = buf; + + buf = mp_encode_map(buf, map_size); buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); buf = mp_encode_uint(buf, r->term); - buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); - buf = mp_encode_uint(buf, r->vote); + if (r->vote != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); + buf = mp_encode_uint(buf, r->vote); + } + if (r->state != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_STATE); + buf = mp_encode_uint(buf, r->state); + } + if (r->vclock != NULL) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK); + buf = mp_encode_vclock_ignore0(buf, r->vclock); + } + row->body[0].iov_len = buf - begin; return 0; } int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock) { - /* TODO: handle bad format. */ assert(row->type == IPROTO_RAFT); - assert(row->bodycnt == 1); - assert(row->group_id == GROUP_LOCAL); + if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "malformed raft request"); + return -1; + } memset(r, 0, sizeof(*r)); - const char *pos = row->body[0].iov_base; + + const char *begin = row->body[0].iov_base; + const char *end = begin + row->body[0].iov_len; + const char *pos = begin; uint32_t map_size = mp_decode_map(&pos); for (uint32_t i = 0; i < map_size; ++i) { + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; uint64_t key = mp_decode_uint(&pos); switch (key) { case IPROTO_RAFT_TERM: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->term = mp_decode_uint(&pos); break; case IPROTO_RAFT_VOTE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->vote = mp_decode_uint(&pos); break; + case IPROTO_RAFT_STATE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; + r->state = mp_decode_uint(&pos); + break; + case IPROTO_RAFT_VCLOCK: + r->vclock = vclock; + if (r->vclock == NULL) + mp_next(&pos); + else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0) + goto bad_msgpack; + break; default: mp_next(&pos); break; } } return 0; + +bad_msgpack: + xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body"); + return -1; } int diff --git a/src/box/xrow.h b/src/box/xrow.h index 1740df6141..25985ad7fd 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); struct raft_request { uint64_t term; uint32_t vote; + uint32_t state; + struct vclock *vclock; }; int @@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r); int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock); /** * CALL/EVAL request. -- GitLab