diff --git a/src/box/applier.cc b/src/box/applier.cc index 1a0b55640a35dacd94ec22e9cc4ec7e192c4e491..9fed3c071f6bfbd1ddc367f810e77395a7cb157e 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -55,6 +55,7 @@ #include "scoped_guard.h" #include "txn_limbo.h" #include "journal.h" +#include "raft.h" STRS(applier_state, applier_STATE); @@ -878,6 +879,18 @@ apply_synchro_row(struct xrow_header *row) return -1; } +static int +applier_handle_raft(struct applier *applier, struct xrow_header *row) +{ + assert(iproto_type_is_raft_request(row->type)); + + struct raft_request req; + struct vclock candidate_clock; + if (xrow_decode_raft(row, &req, &candidate_clock) != 0) + return -1; + return raft_process_msg(&req, applier->instance_id); +} + /** * Apply all rows in the rows queue as a single transaction. * @@ -1222,11 +1235,20 @@ applier_subscribe(struct applier *applier) * In case of an heartbeat message wake a writer up * and check applier state. */ - if (stailq_first_entry(&rows, struct applier_tx_row, - next)->row.lsn == 0) + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + if (first_row->lsn == 0) { + if (unlikely(iproto_type_is_raft_request( + first_row->type))) { + if (applier_handle_raft(applier, + first_row) != 0) + diag_raise(); + } applier_signal_ack(applier); - else if (applier_apply_tx(&rows) != 0) + } else if (applier_apply_tx(&rows) != 0) { diag_raise(); + } if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf); diff --git a/src/box/box.cc b/src/box/box.cc index 99a15bfd0726ec81f93566e074f65f3906f214a7..a8542cb38ed02616b3df8017de8d5fdcebfbb16c 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) } if (iproto_type_is_raft_request(row->type)) { struct raft_request raft_req; - if (xrow_decode_raft(row, &raft_req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &raft_req, NULL) != 0) diag_raise(); raft_process_recovery(&raft_req); return; @@ -2146,7 +2147,19 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); say_info("remote vclock %s local vclock %s", vclock_to_string(&replica_clock), vclock_to_string(&vclock)); - + if (raft_is_enabled()) { + /* + * Send out the current raft state of the instance. Don't do + * that if Raft is disabled. It can be that a part of the + * cluster still contains old versions, which can't handle Raft + * messages. So when it is disabled, its network footprint + * should be 0. + */ + struct raft_request req; + raft_serialize_for_network(&req, &vclock); + xrow_encode_raft(&row, &fiber()->gc, &req); + coio_write_xrow(io, &row); + } /* * Replica clock is used in gc state and recovery * initialization, so we need to replace the remote 0-th diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index f9347f555091032a0f992103bc9c52e168d4d27f..d3738c7050ec6b3bf4db2d46471bccb079d87473 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -264,6 +264,8 @@ extern const char *iproto_type_strs[]; enum iproto_raft_keys { IPROTO_RAFT_TERM = 0, IPROTO_RAFT_VOTE = 1, + IPROTO_RAFT_STATE = 2, + IPROTO_RAFT_VCLOCK = 3, }; /** diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 2f38c2647f785466641e536615c87f2dc3528d7c..8147557f68e081b4e3c78776ccc3a5324030938b 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -207,7 +207,8 @@ memtx_engine_recover_raft(const struct xrow_header *row) { assert(row->type == IPROTO_RAFT); struct raft_request req; - if (xrow_decode_raft(row, &req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &req, NULL) != 0) return -1; raft_process_recovery(&req); return 0; diff --git a/src/box/raft.c b/src/box/raft.c index ee54d02b7708882b3dd010a352ec1b5f9c376c9f..024433369f19e83d5b81e67be7a95f8e72a2cef5 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -34,9 +34,20 @@ #include "journal.h" #include "xrow.h" #include "small/region.h" +#include "replication.h" +#include "relay.h" + +const char *raft_state_strs[] = { + NULL, + "follower", + "candidate", + "leader", +}; /** Raft state of this instance. */ struct raft raft = { + .leader = 0, + .state = RAFT_STATE_FOLLOWER, .is_enabled = false, .is_candidate = false, .term = 1, @@ -50,18 +61,71 @@ raft_process_recovery(const struct raft_request *req) raft.term = req->term; if (req->vote != 0) raft.vote = req->vote; + /* + * Role is never persisted. If recovery is happening, the + * node was restarted, and the former role can be false + * anyway. + */ + assert(req->state == 0); + /* + * Vclock is always persisted by some other subsystem - WAL, snapshot. + * It is used only to decide to whom to give the vote during election, + * as a part of the volatile state. + */ + assert(req->vclock == NULL); + /* Raft is not enabled until recovery is finished. */ + assert(!raft_is_enabled()); +} + +int +raft_process_msg(const struct raft_request *req, uint32_t source) +{ + (void)source; + if (req->term > raft.term) { + // Update term. + // The logic will be similar, but the code + // below is for testing purposes. + raft.term = req->term; + } + if (req->vote > 0) { + // Check whether the vote's for us. + } + switch (req->state) { + case RAFT_STATE_FOLLOWER: + break; + case RAFT_STATE_CANDIDATE: + // Perform voting logic. + break; + case RAFT_STATE_LEADER: + // Switch to a new leader. + break; + default: + break; + } + return 0; } void -raft_serialize_for_network(struct raft_request *req) +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock) { + memset(req, 0, sizeof(*req)); req->term = raft.term; req->vote = raft.vote; + req->state = raft.state; + /* + * Raft does not own vclock, so it always expects it passed externally. + * Vclock is sent out only by candidate instances. + */ + if (req->state == RAFT_STATE_CANDIDATE) { + req->vclock = vclock; + vclock_copy(vclock, &replicaset.vclock); + } } void raft_serialize_for_disk(struct raft_request *req) { + memset(req, 0, sizeof(*req)); req->term = raft.term; req->vote = raft.vote; } @@ -93,3 +157,10 @@ void raft_cfg_death_timeout(void) { } + +void +raft_broadcast(const struct raft_request *req) +{ + replicaset_foreach(replica) + relay_push_raft(replica->relay, req); +} diff --git a/src/box/raft.h b/src/box/raft.h index f272227528d985d4e041fae3dc2b596dd643d43e..8abde4f4c276b049245578977432d2a52561cc25 100644 --- a/src/box/raft.h +++ b/src/box/raft.h @@ -37,8 +37,19 @@ extern "C" { #endif struct raft_request; +struct vclock; + +enum raft_state { + RAFT_STATE_FOLLOWER = 1, + RAFT_STATE_CANDIDATE = 2, + RAFT_STATE_LEADER = 3, +}; + +extern const char *raft_state_strs[]; struct raft { + uint32_t leader; + enum raft_state state; bool is_enabled; bool is_candidate; uint64_t term; @@ -48,10 +59,25 @@ struct raft { extern struct raft raft; +/** Check if Raft is enabled. */ +static inline bool +raft_is_enabled(void) +{ + return raft.is_enabled; +} + /** Process a raft entry stored in WAL/snapshot. */ void raft_process_recovery(const struct raft_request *req); +/** + * Process a raft status message coming from the network. + * @param req Raft request. + * @param source Instance ID of the message sender. + */ +int +raft_process_msg(const struct raft_request *req, uint32_t source); + /** Configure whether Raft is enabled. */ void raft_cfg_is_enabled(bool is_enabled); @@ -88,7 +114,7 @@ raft_cfg_death_timeout(void); * cluster. It is allowed to save anything here, not only persistent state. */ void -raft_serialize_for_network(struct raft_request *req); +raft_serialize_for_network(struct raft_request *req, struct vclock *vclock); /** * Save complete Raft state into a request to be persisted on disk. Only term @@ -97,6 +123,13 @@ raft_serialize_for_network(struct raft_request *req); void raft_serialize_for_disk(struct raft_request *req); +/** + * Broadcast the changes in this instance's raft status to all + * the followers. + */ +void +raft_broadcast(const struct raft_request *req); + #if defined(__cplusplus) } #endif diff --git a/src/box/relay.cc b/src/box/relay.cc index 124b0f52f5f4e619feb11e4665430de0e634a22b..76430caa68c0839b4d618932994c96b942fe55b0 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -53,6 +53,7 @@ #include "xstream.h" #include "wal.h" #include "txn_limbo.h" +#include "raft.h" /** * Cbus message to send status updates from relay to tx thread. @@ -145,6 +146,12 @@ struct relay { alignas(CACHELINE_SIZE) /** Known relay vclock. */ struct vclock vclock; + /** + * True if the relay needs Raft updates. It can live fine + * without sending Raft updates, if it is a relay to an + * anonymous replica, for example. + */ + bool is_raft_enabled; } tx; }; @@ -572,6 +579,74 @@ relay_send_heartbeat(struct relay *relay) } } +/** A message to set Raft enabled flag in TX thread from a relay thread. */ +struct relay_is_raft_enabled_msg { + /** Base cbus message. */ + struct cmsg base; + /** + * First hop - TX thread, second hop - a relay thread, to notify about + * the flag being set. + */ + struct cmsg_hop route[2]; + /** Relay pointer to set the flag in. */ + struct relay *relay; + /** New flag value. */ + bool value; + /** Flag to wait for the flag being set, in a relay thread. */ + bool is_finished; +}; + +/** TX thread part of the Raft flag setting, first hop. */ +static void +tx_set_is_raft_enabled(struct cmsg *base) +{ + struct relay_is_raft_enabled_msg *msg = + (struct relay_is_raft_enabled_msg *)base; + msg->relay->tx.is_raft_enabled = msg->value; +} + +/** Relay thread part of the Raft flag setting, second hop. */ +static void +relay_set_is_raft_enabled(struct cmsg *base) +{ + struct relay_is_raft_enabled_msg *msg = + (struct relay_is_raft_enabled_msg *)base; + msg->is_finished = true; +} + +/** + * Set relay Raft enabled flag from a relay thread to be accessed by the TX + * thread. + */ +static void +relay_send_is_raft_enabled(struct relay *relay, + struct relay_is_raft_enabled_msg *msg, bool value) +{ + msg->route[0].f = tx_set_is_raft_enabled; + msg->route[0].pipe = &relay->relay_pipe; + msg->route[1].f = relay_set_is_raft_enabled; + msg->route[1].pipe = NULL; + msg->relay = relay; + msg->value = value; + msg->is_finished = false; + cmsg_init(&msg->base, msg->route); + cpipe_push(&relay->tx_pipe, &msg->base); + /* + * cbus_call() can't be used, because it works only if the sender thread + * is a simple cbus_process() loop. But the relay thread is not - + * instead it calls cbus_process() manually when ready. And the thread + * loop consists of the main fiber wakeup. So cbus_call() would just + * hang, because cbus_process() wouldn't be called by the scheduler + * fiber. + */ + while (!msg->is_finished) { + cbus_process(&relay->endpoint); + if (msg->is_finished) + break; + fiber_yield(); + } +} + /** * A libev callback invoked when a relay client socket is ready * for read. This currently only happens when the client closes @@ -592,6 +667,10 @@ relay_subscribe_f(va_list ap) cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); + struct relay_is_raft_enabled_msg raft_enabler; + if (!relay->replica->anon) + relay_send_is_raft_enabled(relay, &raft_enabler, true); + /* * Setup garbage collection trigger. * Not needed for anonymous replicas, since they @@ -671,6 +750,9 @@ relay_subscribe_f(va_list ap) cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); } + if (!relay->replica->anon) + relay_send_is_raft_enabled(relay, &raft_enabler, false); + /* * Log the error that caused the relay to break the loop. * Don't clear the error for status reporting. @@ -770,13 +852,75 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } +struct relay_raft_msg { + struct cmsg base; + struct cmsg_hop route; + struct raft_request req; + struct vclock vclock; + struct relay *relay; +}; + +static void +relay_raft_msg_push(struct cmsg *base) +{ + struct relay_raft_msg *msg = (struct relay_raft_msg *)base; + struct xrow_header row; + xrow_encode_raft(&row, &fiber()->gc, &msg->req); + try { + relay_send(msg->relay, &row); + } catch (Exception *e) { + relay_set_error(msg->relay, e); + fiber_cancel(fiber()); + } + free(msg); +} + +void +relay_push_raft(struct relay *relay, const struct raft_request *req) +{ + /* + * Raft updates don't stack. They are thrown away if can't be pushed + * now. This is fine, as long as relay's live much longer that the + * timeouts in Raft are set. + */ + if (!relay->tx.is_raft_enabled) + return; + /* + * XXX: the message should be preallocated. It should + * work like Kharon in IProto. Relay should have 2 raft + * messages rotating. When one is sent, the other can be + * updated and a flag is set. When the first message is + * sent, the control returns to TX thread, sees the set + * flag, rotates the buffers, and sends it again. And so + * on. This is how it can work in future, with 0 heap + * allocations. Current solution with alloc-per-update is + * good enough as a start. Another option - wait until all + * is moved to WAL thread, where this will all happen + * in one thread and will be much simpler. + */ + struct relay_raft_msg *msg = + (struct relay_raft_msg *)malloc(sizeof(*msg)); + if (msg == NULL) { + panic("Couldn't allocate raft message"); + return; + } + msg->req = *req; + if (req->vclock != NULL) { + msg->req.vclock = &msg->vclock; + vclock_copy(&msg->vclock, req->vclock); + } + msg->route.f = relay_raft_msg_push; + msg->route.pipe = NULL; + cmsg_init(&msg->base, &msg->route); + msg->relay = relay; + cpipe_push(&relay->relay_pipe, &msg->base); +} + /** Send a single row to the client. */ static void relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); - assert(iproto_type_is_dml(packet->type) || - iproto_type_is_synchro_request(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other @@ -793,6 +937,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; } + assert(iproto_type_is_dml(packet->type) || + iproto_type_is_synchro_request(packet->type)); /* Check if the rows from the instance are filtered. */ if ((1 << packet->replica_id & relay->id_filter) != 0) return; diff --git a/src/box/relay.h b/src/box/relay.h index 0632fa91275f998e2adb003a97bdc5b9ec9bf3a1..b32e2ea2ac7e33cbedb3919bba457f195fd7aea6 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay); double relay_last_row_time(const struct relay *relay); +/** + * Send a Raft update request to the relay channel. It is not + * guaranteed that it will be delivered. The connection may break. + */ +void +relay_push_raft(struct relay *relay, const struct raft_request *req); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/xrow.c b/src/box/xrow.c index b9bbb19a0de11511e69613e2c77c510e6f722ffe..da5c6ffaea7bc440b777a73bd002188bcf3c8d33 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -962,11 +962,30 @@ int xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_RAFT_TERM) + - mp_sizeof_uint(r->term) + - mp_sizeof_uint(IPROTO_RAFT_VOTE) + - mp_sizeof_uint(r->vote); + /* + * Terms is encoded always. Sometimes the rest can be even ignored if + * the term is too old. + */ + int map_size = 1; + size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) + + mp_sizeof_uint(r->term); + if (r->vote != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VOTE) + + mp_sizeof_uint(r->vote); + } + if (r->state != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_STATE) + + mp_sizeof_uint(r->state); + } + if (r->vclock != NULL) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) + + mp_sizeof_vclock_ignore0(r->vclock); + } + size += mp_sizeof_map(map_size); + char *buf = region_alloc(region, size); if (buf == NULL) { diag_set(OutOfMemory, size, "region_alloc", "buf"); @@ -975,43 +994,83 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, memset(row, 0, sizeof(*row)); row->type = IPROTO_RAFT; row->body[0].iov_base = buf; - row->body[0].iov_len = size; row->group_id = GROUP_LOCAL; row->bodycnt = 1; - buf = mp_encode_map(buf, 2); + const char *begin = buf; + + buf = mp_encode_map(buf, map_size); buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); buf = mp_encode_uint(buf, r->term); - buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); - buf = mp_encode_uint(buf, r->vote); + if (r->vote != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); + buf = mp_encode_uint(buf, r->vote); + } + if (r->state != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_STATE); + buf = mp_encode_uint(buf, r->state); + } + if (r->vclock != NULL) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK); + buf = mp_encode_vclock_ignore0(buf, r->vclock); + } + row->body[0].iov_len = buf - begin; return 0; } int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock) { - /* TODO: handle bad format. */ assert(row->type == IPROTO_RAFT); - assert(row->bodycnt == 1); - assert(row->group_id == GROUP_LOCAL); + if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "malformed raft request"); + return -1; + } memset(r, 0, sizeof(*r)); - const char *pos = row->body[0].iov_base; + + const char *begin = row->body[0].iov_base; + const char *end = begin + row->body[0].iov_len; + const char *pos = begin; uint32_t map_size = mp_decode_map(&pos); for (uint32_t i = 0; i < map_size; ++i) { + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; uint64_t key = mp_decode_uint(&pos); switch (key) { case IPROTO_RAFT_TERM: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->term = mp_decode_uint(&pos); break; case IPROTO_RAFT_VOTE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->vote = mp_decode_uint(&pos); break; + case IPROTO_RAFT_STATE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; + r->state = mp_decode_uint(&pos); + break; + case IPROTO_RAFT_VCLOCK: + r->vclock = vclock; + if (r->vclock == NULL) + mp_next(&pos); + else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0) + goto bad_msgpack; + break; default: mp_next(&pos); break; } } return 0; + +bad_msgpack: + xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body"); + return -1; } int diff --git a/src/box/xrow.h b/src/box/xrow.h index 1740df61411bc2b46d09a43ccd7834f2fedd7105..25985ad7fd05d23cd0618e3f48d385aaea52ec22 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); struct raft_request { uint64_t term; uint32_t vote; + uint32_t state; + struct vclock *vclock; }; int @@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r); int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock); /** * CALL/EVAL request.