From 07e20b9cd77bf4eb78df17980dc98d3b5601c7f2 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Tue, 2 Jun 2015 18:56:19 +0300 Subject: [PATCH] replication: fix gh-853 "incomprehensible error message" When handling join, an error from box_on_cluster_join() was ignored and not sent to the client, so the client believed that the JOIN was handled successfully and proceeded with SUBSCRIBE. Before sending OK to join, execute box_on_cluster_join(), if it fails, the client sees an error and stops. Make ER_REPLICA_MAX a logged error. No test case since the test requires a fail-stop test of the replica, which is clumsy in our framework. --- src/box/box.cc | 8 ++------ src/box/replication.cc | 29 +++++++++++++++++++---------- src/box/replication.h | 3 ++- src/box/vclock.h | 2 +- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index fb83de3f3b..500c72c6d0 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -316,7 +316,7 @@ box_on_cluster_join(const tt_uuid *server_uuid) /** Assign a new server id. */ uint32_t server_id = tuple ? tuple_field_u32(tuple, 0) + 1 : 1; if (server_id >= VCLOCK_MAX) - tnt_raise(ClientError, ER_REPLICA_MAX, server_id); + tnt_raise(LoggedError, ER_REPLICA_MAX, server_id); boxk(IPROTO_INSERT, SC_CLUSTER_ID, "%u%s", (unsigned) server_id, tt_uuid_str(server_uuid)); @@ -330,13 +330,9 @@ box_process_join(int fd, struct xrow_header *header) access_check_space(space_cache_find(SC_CLUSTER_ID), PRIV_W); assert(header->type == IPROTO_JOIN); - struct tt_uuid server_uuid = uuid_nil; - xrow_decode_join(header, &server_uuid); /* Process JOIN request via replication relay */ - replication_join(fd, header); - /** Register the server with the cluster. */ - box_on_cluster_join(&server_uuid); + replication_join(fd, header, box_on_cluster_join); } void diff --git a/src/box/replication.cc b/src/box/replication.cc index 10db203b4e..d376f44b96 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -82,31 +82,40 @@ void replication_join_f(va_list ap) { Relay *relay = va_arg(ap, Relay *); - struct recovery_state *r = relay->r; relay_set_cord_name(relay->io.fd); /* Send snapshot */ engine_join(relay); - /* Send response to JOIN command = end of stream */ - struct xrow_header row; - xrow_encode_vclock(&row, vclockset_last(&r->snap_dir.index)); - row.sync = relay->sync; - struct iovec iov[XROW_IOVMAX]; - int iovcnt = xrow_to_iovec(&row, iov); - coio_writev(&relay->io, iov, iovcnt, 0); - say_info("snapshot sent"); } void -replication_join(int fd, struct xrow_header *packet) +replication_join(int fd, struct xrow_header *packet, + void (*on_join)(const struct tt_uuid *)) { Relay relay(fd, packet->sync); + struct recovery_state *r = relay.r; + + struct tt_uuid server_uuid = uuid_nil; + xrow_decode_join(packet, &server_uuid); struct cord cord; cord_costart(&cord, "join", replication_join_f, &relay); cord_cojoin(&cord); + /** + * Call the server-side hook which stores the replica uuid + * in _cluster hook after sending the last row but before + * sending OK - if the hook fails, the error reaches the + * client. + */ + on_join(&server_uuid); + + /* Send response to JOIN command = end of stream */ + struct xrow_header row; + xrow_encode_vclock(&row, vclockset_last(&r->snap_dir.index)); + relay_send(&relay, &row); + say_info("snapshot sent"); } static void diff --git a/src/box/replication.h b/src/box/replication.h index d84c70c544..02873228ef 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -46,7 +46,8 @@ class Relay { }; void -replication_join(int fd, struct xrow_header *packet); +replication_join(int fd, struct xrow_header *packet, + void (*on_join)(const struct tt_uuid *)); /** * Subscribe a replica to updates. diff --git a/src/box/vclock.h b/src/box/vclock.h index e0cbe1952b..a8e9d6303e 100644 --- a/src/box/vclock.h +++ b/src/box/vclock.h @@ -281,7 +281,7 @@ static inline void vclock_add_server(struct vclock *vclock, uint32_t server_id) { if (server_id >= VCLOCK_MAX) - tnt_raise(ClientError, ER_REPLICA_MAX, server_id); + tnt_raise(LoggedError, ER_REPLICA_MAX, server_id); assert(! vclock_has(vclock, server_id)); vclock_add_server_nothrow(vclock, server_id); } -- GitLab