From 73e445033ba65c5c844ce96dd49a2b101d76adaa Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Sat, 5 Sep 2015 13:25:06 +0300 Subject: [PATCH] replication: send vclock packet in subscribe handshake --- src/box/box.cc | 16 ++++++++++-- src/box/relay.cc | 33 +++++++++++++++++++------ src/box/relay.h | 6 +++-- src/box/replica.cc | 43 +++++++++++++++++---------------- test/replication/catch.result | 24 ++++++++++-------- test/replication/catch.test.lua | 25 ++++++++++--------- 6 files changed, 93 insertions(+), 54 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 96fd828bce..f2678bb7de 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -553,8 +553,20 @@ box_process_subscribe(int fd, struct xrow_header *header) /* Check permissions */ access_check_universe(PRIV_R); - /* process SUBSCRIBE request via replication relay */ - relay_subscribe(fd, header); + /* + * Process SUBSCRIBE request via replication relay + * Send current recovery vector clock as a marker + * of the "current" state of the master. When + * replica fetches rows up to this position, + * it enters read-write mode. + * + * @todo: this is not implemented, this is imperfect, and + * this is buggy in case there is rollback followed by + * a stall in updates (in this case replica may hang + * indefinitely). + */ + relay_subscribe(fd, header, recovery->server_id, + &recovery->vclock); } /** Replace the current server id in _cluster */ diff --git a/src/box/relay.cc b/src/box/relay.cc index edf0ae3601..220836abb6 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -94,7 +94,7 @@ relay_join_f(va_list ap) void relay_join(int fd, struct xrow_header *packet, - uint32_t server_id, + uint32_t master_server_id, void (*on_join)(const struct tt_uuid *)) { Relay relay(fd, packet->sync); @@ -124,7 +124,7 @@ relay_join(int fd, struct xrow_header *packet, * server, this is the only way for a replica to find * out the id of the server it has connected to. */ - row.server_id = server_id; + row.server_id = master_server_id; relay_send(&relay, &row); } @@ -197,7 +197,9 @@ relay_subscribe_f(va_list ap) /** Replication acceptor fiber handler. */ void -relay_subscribe(int fd, struct xrow_header *packet) +relay_subscribe(int fd, struct xrow_header *packet, + uint32_t master_server_id, + struct vclock *master_vclock) { Relay relay(fd, packet->sync); @@ -224,6 +226,20 @@ relay_subscribe(int fd, struct xrow_header *packet) tnt_raise(ClientError, ER_UNKNOWN_SERVER, tt_uuid_str(&server_uuid)); } + /* + * Send a response to SUBSCRIBE request, tell + * the replica how many rows we have in stock for it, + * and identify ourselves with our own server id. + */ + struct xrow_header row; + xrow_encode_vclock(&row, master_vclock); + /* + * Identify the message with the server id of this + * server, this is the only way for a replica to find + * out the id of the server it has connected to. + */ + row.server_id = master_server_id; + relay_send(&relay, &row); struct cord cord; cord_costart(&cord, "subscribe", relay_subscribe_f, &relay); @@ -237,10 +253,6 @@ relay_send(Relay *relay, struct xrow_header *packet) struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec(packet, iov); coio_writev(&relay->io, iov, iovcnt, 0); - ERROR_INJECT(ERRINJ_RELAY, - { - sleep(1000); - }); } /** Send a single row to the client. */ @@ -259,8 +271,13 @@ relay_send_row(struct recovery_state *r, void *param, * it is not from the same server (i.e. don't send * replica's own rows back). */ - if (packet->server_id == 0 || packet->server_id != r->server_id) + if (packet->server_id == 0 || packet->server_id != r->server_id) { relay_send(relay, packet); + ERROR_INJECT(ERRINJ_RELAY, + { + fiber_sleep(1000.0); + }); + } /* * Update local vclock. During normal operation wal_write() * updates local vclock. In relay mode we have to update diff --git a/src/box/relay.h b/src/box/relay.h index 050be5bba3..7ca306c814 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -68,7 +68,7 @@ class Relay { */ void relay_join(int fd, struct xrow_header *packet, - uint32_t server_id, + uint32_t master_server_id, void (*on_join)(const struct tt_uuid *)); /** @@ -77,7 +77,9 @@ relay_join(int fd, struct xrow_header *packet, * @return none. */ void -relay_subscribe(int fd, struct xrow_header *packet); +relay_subscribe(int fd, struct xrow_header *packet, + uint32_t master_server_id, + struct vclock *master_vclock); void relay_send(Relay *relay, struct xrow_header *packet); diff --git a/src/box/replica.cc b/src/box/replica.cc index 2c748f34b2..01206312b6 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -145,8 +145,8 @@ replica_connect(struct replica *replica, struct ev_io *coio, * Execute and process JOIN request (bootstrap the server). */ static void -replica_process_join(struct replica *replica, struct recovery_state *r, - struct ev_io *coio, struct iobuf *iobuf) +replica_join(struct replica *replica, struct recovery_state *r, + struct ev_io *coio, struct iobuf *iobuf) { say_info("downloading a snapshot from %s", sio_strfaddr(&replica->addr, replica->addr_len)); @@ -191,8 +191,8 @@ replica_process_join(struct replica *replica, struct recovery_state *r, * Execute and process SUBSCRIBE request (follow updates from a master). */ static void -replica_process_subscribe(struct replica *replica, struct recovery_state *r, - struct ev_io *coio, struct iobuf *iobuf) +replica_subscribe(struct replica *replica, struct recovery_state *r, + struct ev_io *coio, struct iobuf *iobuf) { /* Send SUBSCRIBE request */ struct xrow_header row; @@ -225,7 +225,7 @@ replica_process_subscribe(struct replica *replica, struct recovery_state *r, /** * Write a nice error message to log file on SocketError or ClientError - * in pull_from_replica(). + * in replica_f(). */ static inline void replica_log_exception(struct replica *replica, Exception *e) @@ -254,7 +254,7 @@ replica_log_exception(struct replica *replica, Exception *e) } static void -pull_from_replica(va_list ap) +replica_f(va_list ap) { struct replica *replica = va_arg(ap, struct replica *); struct recovery_state *r = va_arg(ap, struct recovery_state *); @@ -269,24 +269,25 @@ pull_from_replica(va_list ap) try { if (coio->fd < 0) replica_connect(replica, coio, iobuf); - /* - * Execute JOIN if recovery is not finalized yet - * and SUBSCRIBE otherwise. + * Execute JOIN if this is a bootstrap, and + * there is no snapshot, and SUBSCRIBE + * otherwise. */ if (r->writer == NULL) { - replica_process_join(replica, r, coio, iobuf); - ev_io_stop(loop(), coio); - /* keep connection */ - return; + replica_join(replica, r, coio, iobuf); + } else { + replica_subscribe(replica, r, coio, iobuf); + /* + * subscribe() has an infinite + * loop which is stoppable only + * with fiber_cancel() + */ + assert(0); } - replica_process_subscribe(replica, r, coio, iobuf); - /* - * process_subscribe() has an infinity loop and - * can be stopped only using fiber_cancel() - */ - assert(0); /* unreachable */ - break; + ev_io_stop(loop(), coio); + /* Don't close the socket */ + return; } catch (ClientError *e) { replica_log_exception(replica, e); evio_close(loop, coio); @@ -346,7 +347,7 @@ replica_start(struct replica *replica, struct recovery_state *r) say_crit("starting replication from %s", uri); snprintf(name, sizeof(name), "replica/%s", uri); - struct fiber *f = fiber_new(name, pull_from_replica); + struct fiber *f = fiber_new(name, replica_f); /** * So that we can safely grab the status of the * fiber any time we want. diff --git a/test/replication/catch.result b/test/replication/catch.result index 76b5554dac..0eba84191d 100644 --- a/test/replication/catch.result +++ b/test/replication/catch.result @@ -29,7 +29,7 @@ while box.space.test == nil do fiber.sleep(0.01) end ... --# set connection default --# stop server replica --- insert values on the master while replica os stopped and can't fetch them +-- insert values on the master while replica is stopped and can't fetch them for i=1,100 do s:insert{i, 'this is test message12345'} end --- ... @@ -40,14 +40,16 @@ errinj.set("ERRINJ_RELAY", true) ... --# start server replica --# set connection replica --- Check that replica doesn't enter read-write mode --- before catching up with the master: to check that we inject --- sleep into the master relay_send function and attempt a data --- modifying statement in replica while it's still fetching --- data from the master. --- In next 2 cases we try to delete tuple --- during fetching process(local delete, remote delete) --- case #1: delete tuple in replica +-- Check that replica doesn't enter read-write mode before +-- catching up with the master: to check that we inject sleep into +-- the master relay_send function and attempt a data modifying +-- statement in replica while it's still fetching data from the +-- master. +-- In the next two cases we try to delete a tuple while replica is +-- catching up with the master (local delete, remote delete) case +-- +-- #1: delete tuple on replica +-- box.space.test:len() --- - 1 @@ -59,7 +61,9 @@ box.space.test:get(1) ~= nil --- - false ... --- case #2: delete tuple by net.box +-- +-- case #2: delete tuple via net.box +-- --# set connection default --# set variable r_uri to 'replica.listen' c = net_box:new(r_uri) diff --git a/test/replication/catch.test.lua b/test/replication/catch.test.lua index 5ed3e89d71..4dd9299992 100644 --- a/test/replication/catch.test.lua +++ b/test/replication/catch.test.lua @@ -17,7 +17,7 @@ while box.space.test == nil do fiber.sleep(0.01) end --# set connection default --# stop server replica --- insert values on the master while replica os stopped and can't fetch them +-- insert values on the master while replica is stopped and can't fetch them for i=1,100 do s:insert{i, 'this is test message12345'} end -- sleep after every tuple @@ -26,19 +26,22 @@ errinj.set("ERRINJ_RELAY", true) --# start server replica --# set connection replica --- Check that replica doesn't enter read-write mode --- before catching up with the master: to check that we inject --- sleep into the master relay_send function and attempt a data --- modifying statement in replica while it's still fetching --- data from the master. --- In next 2 cases we try to delete tuple --- during fetching process(local delete, remote delete) --- case #1: delete tuple in replica +-- Check that replica doesn't enter read-write mode before +-- catching up with the master: to check that we inject sleep into +-- the master relay_send function and attempt a data modifying +-- statement in replica while it's still fetching data from the +-- master. +-- In the next two cases we try to delete a tuple while replica is +-- catching up with the master (local delete, remote delete) case +-- +-- #1: delete tuple on replica +-- box.space.test:len() d = box.space.test:delete{1} box.space.test:get(1) ~= nil - --- case #2: delete tuple by net.box +-- +-- case #2: delete tuple via net.box +-- --# set connection default --# set variable r_uri to 'replica.listen' c = net_box:new(r_uri) -- GitLab