From 9da4ff14b262d69285191b1650f1e1fef09d8cc7 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Thu, 3 Sep 2015 22:10:09 +0300
Subject: [PATCH] replication: allocate struct replica on heap

* allocate struct replica on heap
* identify the master in join request
  by sending back its server id
---
 src/box/box.cc     | 23 +++++++++--------------
 src/box/relay.cc   | 32 +++++++++++++++++++++-----------
 src/box/relay.h    | 25 +++++++++++++++++++++----
 src/box/replica.cc | 15 +++++++++++----
 src/box/replica.h  | 14 +++++++++-----
 5 files changed, 71 insertions(+), 38 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 364100b839..96fd828bce 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -60,8 +60,6 @@
 
 struct recovery_state *recovery;
 
-static struct replica replica_buf; /* used to store the single instance */
-
 bool snapshot_in_progress = false;
 static bool box_init_done = false;
 bool is_ro = true;
@@ -210,7 +208,7 @@ box_set_replication_source(void)
 	/* This hook is only invoked if source has changed */
 	if (replica != NULL) {
 		replica_stop(replica); /* cancels a background fiber */
-		replica_destroy(replica);
+		replica_delete(replica);
 		replica = NULL;
 	}
 
@@ -218,8 +216,7 @@ box_set_replication_source(void)
 		return;
 
 	/* Start a new replication client using provided URI */
-	replica_create(&replica_buf, source);
-	replica = &replica_buf;
+	replica = replica_new(source);
 	replica_start(replica, recovery); /* starts a background fiber */
 }
 
@@ -545,8 +542,9 @@ box_process_join(int fd, struct xrow_header *header)
 
 	assert(header->type == IPROTO_JOIN);
 
-	/* Process JOIN request via replication relay */
-	replication_join(fd, header, box_on_cluster_join);
+	/* Process JOIN request via a replication relay */
+	relay_join(fd, header, recovery->server_id,
+		   box_on_cluster_join);
 }
 
 void
@@ -556,7 +554,7 @@ box_process_subscribe(int fd, struct xrow_header *header)
 	access_check_universe(PRIV_R);
 
 	/* process SUBSCRIBE request via replication relay */
-	replication_subscribe(fd, header);
+	relay_subscribe(fd, header);
 }
 
 /** Replace the current server id in _cluster */
@@ -699,8 +697,7 @@ box_init(void)
 		vclock_add_server(&recovery->vclock, 0);
 
 		/* Bootstrap from replica */
-		replica_create(&replica_buf, source);
-		replica = &replica_buf;
+		replica = replica_new(source);
 		replica_start(replica, recovery);
 		replica_join(replica); /* throws on failure */
 
@@ -739,10 +736,8 @@ box_init(void)
 	rmean_cleanup(rmean_box);
 
 	if (source != NULL) {
-		if (replica == NULL) {
-			replica_create(&replica_buf, source);
-			replica = &replica_buf;
-		} /* else re-use instance from bootstrap */
+		if (replica == NULL)
+			replica = replica_new(source);
 		/* Follow replica */
 		assert(recovery->writer);
 		replica_start(replica, recovery);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 0f28540b56..edf0ae3601 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -80,7 +80,7 @@ relay_set_cord_name(int fd)
 }
 
 void
-replication_join_f(va_list ap)
+relay_join_f(va_list ap)
 {
 	Relay *relay = va_arg(ap, Relay *);
 
@@ -93,8 +93,9 @@ replication_join_f(va_list ap)
 }
 
 void
-replication_join(int fd, struct xrow_header *packet,
-		 void (*on_join)(const struct tt_uuid *))
+relay_join(int fd, struct xrow_header *packet,
+	   uint32_t server_id,
+	   void (*on_join)(const struct tt_uuid *))
 {
 	Relay relay(fd, packet->sync);
 	struct recovery_state *r = relay.r;
@@ -102,19 +103,28 @@ replication_join(int fd, struct xrow_header *packet,
 	struct tt_uuid server_uuid = uuid_nil;
 	xrow_decode_join(packet, &server_uuid);
 
-	cord_costart(&relay.cord, "join", replication_join_f, &relay);
+	cord_costart(&relay.cord, "join", relay_join_f, &relay);
 	cord_cojoin(&relay.cord);
 	/**
 	 * Call the server-side hook which stores the replica uuid
-	 * in _cluster hook after sending the last row but before
+	 * in _cluster space after sending the last row but before
 	 * sending OK - if the hook fails, the error reaches the
 	 * client.
 	 */
 	on_join(&server_uuid);
 
-	/* Send response to JOIN command = end of stream */
+	/*
+	 * Send a response to JOIN request, an indicator of the
+	 * end of the stream of snapshot rows.
+	 */
 	struct xrow_header row;
 	xrow_encode_vclock(&row, vclockset_last(&r->snap_dir.index));
+	/*
+	 * Identify the message with the server id of this
+	 * server, this is the only way for a replica to find
+	 * out the id of the server it has connected to.
+	 */
+	row.server_id = server_id;
 	relay_send(&relay, &row);
 }
 
@@ -130,7 +140,7 @@ feed_event_f(struct trigger *trigger, void * /* event */)
  * its socket, and we get an EOF.
  */
 static void
-replication_subscribe_f(va_list ap)
+relay_subscribe_f(va_list ap)
 {
 	Relay *relay = va_arg(ap, Relay *);
 	struct recovery_state *r = relay->r;
@@ -187,7 +197,7 @@ replication_subscribe_f(va_list ap)
 
 /** Replication acceptor fiber handler. */
 void
-replication_subscribe(int fd, struct xrow_header *packet)
+relay_subscribe(int fd, struct xrow_header *packet)
 {
 	Relay relay(fd, packet->sync);
 
@@ -209,14 +219,14 @@ replication_subscribe(int fd, struct xrow_header *packet)
 
 	/* Check server uuid */
 	r->server_id = schema_find_id(BOX_CLUSTER_ID, 1,
-				   tt_uuid_str(&server_uuid), UUID_STR_LEN);
+				      tt_uuid_str(&server_uuid), UUID_STR_LEN);
 	if (r->server_id == BOX_ID_NIL) {
 		tnt_raise(ClientError, ER_UNKNOWN_SERVER,
 			  tt_uuid_str(&server_uuid));
 	}
 
 	struct cord cord;
-	cord_costart(&cord, "subscribe", replication_subscribe_f, &relay);
+	cord_costart(&cord, "subscribe", relay_subscribe_f, &relay);
 	cord_cojoin(&cord);
 }
 
@@ -246,7 +256,7 @@ relay_send_row(struct recovery_state *r, void *param,
 	 * (JOIN request). In this case, send every row.
 	 * Otherwise, we're feeding a WAL, thus responding to
 	 * SUBSCRIBE request. In that case, only send a row if
-	 * it not from the same server (i.e. don't send
+	 * it is not from the same server (i.e. don't send
 	 * replica's own rows back).
 	 */
 	if (packet->server_id == 0 || packet->server_id != r->server_id)
diff --git a/src/box/relay.h b/src/box/relay.h
index 8f50f81247..050be5bba3 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -50,17 +50,34 @@ class Relay {
 	~Relay();
 };
 
+/**
+ * Send an initial snapshot to the replica,
+ * register the replica UUID in _cluster
+ * space, end the row with OK packet.
+ *
+ * @param fd        client connection
+ * @param packet    incoming JOIN request
+ *                  packet
+ * @param server_id server_id of this server
+ *                  to send to the replica
+ * @param on_join   the hook to invoke when
+ *                  the snapshot is sent
+ *                  to the replica - it
+ *                  registers the replica with
+ *                  the cluster.
+ */
 void
-replication_join(int fd, struct xrow_header *packet,
-		 void (*on_join)(const struct tt_uuid *));
+relay_join(int fd, struct xrow_header *packet,
+	   uint32_t server_id,
+	   void (*on_join)(const struct tt_uuid *));
 
 /**
  * Subscribe a replica to updates.
  *
- * @return None. On error, closes the socket.
+ * @return none.
  */
 void
-replication_subscribe(int fd, struct xrow_header *packet);
+relay_subscribe(int fd, struct xrow_header *packet);
 
 void
 relay_send(Relay *relay, struct xrow_header *packet);
diff --git a/src/box/replica.cc b/src/box/replica.cc
index 33623f4113..2c748f34b2 100644
--- a/src/box/replica.cc
+++ b/src/box/replica.cc
@@ -383,10 +383,15 @@ replica_join(struct replica *replica)
 	fiber_join(replica->reader); /* may throw */
 }
 
-void
-replica_create(struct replica *replica, const char *uri)
+struct replica *
+replica_new(const char *uri)
 {
-	memset(replica, 0, sizeof(*replica));
+	struct replica *replica = (struct replica *)
+		calloc(1, sizeof(struct replica));
+	if (replica == NULL) {
+		tnt_raise(OutOfMemory, sizeof(*replica), "malloc",
+			  "struct replica");
+	}
 	replica->io.fd = -1;
 
 	/* uri_parse() sets pointers to replica->source buffer */
@@ -395,11 +400,13 @@ replica_create(struct replica *replica, const char *uri)
 	/* URI checked by box_check_replication_source() */
 	assert(rc == 0 && replica->uri.service != NULL);
 	(void) rc;
+	return replica;
 }
 
 void
-replica_destroy(struct replica *replica)
+replica_delete(struct replica *replica)
 {
 	assert(replica->reader == NULL);
 	evio_close(loop(), &replica->io);
+	free(replica);
 }
diff --git a/src/box/replica.h b/src/box/replica.h
index 470527ebf3..95e061a727 100644
--- a/src/box/replica.h
+++ b/src/box/replica.h
@@ -110,15 +110,19 @@ void
 replica_join(struct replica *replica);
 
 /**
- * Create replica and initialize remote uri (copied to struct replica).
+ * Allocate an instance of replica object, create replica and initialize
+ * remote uri (copied to struct replica).
+ *
+ * @pre     the uri is a valid and checked one
+ * @error   throws OutOfMemory exception if out of memory.
  */
-void
-replica_create(struct replica *replica, const char *uri);
+struct replica *
+replica_new(const char *uri);
 
 /**
- * Destroy replica.
+ * Destroy and delete a replica.
  */
 void
-replica_destroy(struct replica *replica);
+replica_delete(struct replica *replica);
 
 #endif /* TARANTOOL_REPLICA_H_INCLUDED */
-- 
GitLab