From ef45ecb9e51f62868e10cf4508671a56e1e80fe0 Mon Sep 17 00:00:00 2001
From: Roman Tsisyk <roman@tsisyk.com>
Date: Wed, 9 Sep 2015 16:03:54 +0300
Subject: [PATCH] Add replicaset - a set for replicas by replication source.

Prepare for multi-master.
---
 src/box/box.cc      | 23 ++++++++++++------
 src/box/cluster.cc  | 58 ++++++++++++++++++++++++++++++++++++++++++++-
 src/box/cluster.h   | 30 +++++++++++++++++++++--
 src/box/lua/info.cc |  1 +
 src/box/replica.h   |  3 +++
 5 files changed, 105 insertions(+), 10 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ed2e3a85d8..bbac39933c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -206,10 +206,13 @@ box_set_replication_source(void)
 	const char *source = cfg_gets("replication_source");
 
 	/* This hook is only invoked if source has changed */
-	if (replica != NULL) {
+	struct replica *replica = cluster_replica_first();
+	while (replica != NULL) {
+		struct replica *next = cluster_replica_next(replica);
 		replica_stop(replica); /* cancels a background fiber */
+		cluster_del_replica(replica);
 		replica_delete(replica);
-		replica = NULL;
+		replica = next; /* safe iteration with cluster_del_replica */
 	}
 
 	if (source == NULL)
@@ -217,6 +220,7 @@ box_set_replication_source(void)
 
 	/* Start a new replication client using provided URI */
 	replica = replica_new(source);
+	cluster_add_replica(replica);
 	replica_start(replica, recovery); /* starts a background fiber */
 }
 
@@ -611,6 +615,7 @@ box_free(void)
 	 */
 	if (box_init_done) {
 		session_free();
+		cluster_free();
 		user_cache_free();
 		schema_free();
 		tuple_free();
@@ -682,6 +687,8 @@ box_init(void)
 	 */
 	session_init();
 
+	cluster_init();
+
 	title("loading", NULL);
 
 	/* recovery initialization */
@@ -692,6 +699,10 @@ box_init(void)
 			     cfg_geti("panic_on_snap_error"),
 			     cfg_geti("panic_on_wal_error"));
 	const char *source = cfg_gets("replication_source");
+	if (source != NULL) {
+		struct replica *replica = replica_new(source);
+		cluster_add_replica(replica);
+	}
 
 	if (recovery_has_data(recovery)) {
 		/* Tell Sophia engine LSN it must recover to. */
@@ -708,8 +719,8 @@ box_init(void)
 		/* Add a surrogate server id for snapshot rows */
 		vclock_add_server(&recovery->vclock, 0);
 
-		/* Bootstrap from a remote master */
-		replica = replica_new(source);
+		/* Bootstrap from the first master */
+		struct replica *replica = cluster_replica_first();
 		replica_start(replica, recovery);
 		replica_wait(replica); /* throws on failure */
 
@@ -747,9 +758,7 @@ box_init(void)
 
 	rmean_cleanup(rmean_box);
 
-	if (source != NULL) {
-		if (replica == NULL)
-			replica = replica_new(source);
+	cluster_foreach_replica(replica) {
 		/* Follow replica */
 		assert(recovery->writer);
 		replica_start(replica, recovery);
diff --git a/src/box/cluster.cc b/src/box/cluster.cc
index aeb21d6835..e2d120a0a8 100644
--- a/src/box/cluster.cc
+++ b/src/box/cluster.cc
@@ -39,7 +39,31 @@
  */
 tt_uuid cluster_id;
 
-struct replica *replica;
+typedef rb_tree(struct replica) replicaset_t;
+rb_proto(, replicaset_, replicaset_t, struct replica)
+
+static int
+replica_compare_by_source(const struct replica *a, const struct replica *b)
+{
+		return strcmp(a->source, b->source);
+}
+
+rb_gen(, replicaset_, replicaset_t, struct replica, link,
+       replica_compare_by_source);
+
+static replicaset_t replicaset; /* zeroed by linker */
+
+void
+cluster_init(void)
+{
+	replicaset_new(&replicaset);
+}
+
+void
+cluster_free(void)
+{
+
+}
 
 extern "C" struct vclock *
 cluster_clock()
@@ -92,3 +116,35 @@ cluster_del_server(uint32_t server_id)
 		box_set_ro(true);
 	}
 }
+
+void
+cluster_add_replica(struct replica *replica)
+{
+	replicaset_insert(&replicaset, replica);
+}
+
+void
+cluster_del_replica(struct replica *replica)
+{
+	replicaset_remove(&replicaset, replica);
+}
+
+struct replica *
+cluster_find_replica(const char *source)
+{
+	struct replica key;
+	snprintf(key.source, sizeof(key.source), "%s", source);
+	return replicaset_search(&replicaset, &key);
+}
+
+struct replica *
+cluster_replica_first(void)
+{
+	return replicaset_first(&replicaset);
+}
+
+struct replica *
+cluster_replica_next(struct replica *replica)
+{
+	return replicaset_next(&replicaset, replica);
+}
diff --git a/src/box/cluster.h b/src/box/cluster.h
index 8ac9e9ad13..9171b4342a 100644
--- a/src/box/cluster.h
+++ b/src/box/cluster.h
@@ -79,6 +79,12 @@
  * and is implemented in @file vclock.h
  */
 
+void
+cluster_init(void);
+
+void
+cluster_free(void);
+
 /** {{{ Global cluster identifier API **/
 
 /** UUID of the cluster. */
@@ -87,8 +93,6 @@ extern tt_uuid cluster_id;
 extern "C" struct vclock *
 cluster_clock();
 
-extern struct replica *replica;
-
 /* }}} */
 
 /** {{{ Cluster server id API **/
@@ -114,4 +118,26 @@ cluster_del_server(uint32_t server_id);
 
 /** }}} **/
 
+/** {{{ Cluster replica API **/
+
+void
+cluster_add_replica(struct replica *replica);
+
+void
+cluster_del_replica(struct replica *replica);
+
+struct replica *
+cluster_find_replica(const char *source);
+
+struct replica *
+cluster_replica_first(void);
+
+struct replica *
+cluster_replica_next(struct replica *replica);
+
+#define cluster_foreach_replica(var) \
+	for (struct replica *var = cluster_replica_first(); \
+	     var != NULL; var = cluster_replica_next(var))
+/** }}} **/
+
 #endif
diff --git a/src/box/lua/info.cc b/src/box/lua/info.cc
index d8230811b7..a4d46f9a5c 100644
--- a/src/box/lua/info.cc
+++ b/src/box/lua/info.cc
@@ -52,6 +52,7 @@ lbox_info_replication(struct lua_State *L)
 {
 	lua_newtable(L);
 
+	struct replica *replica = cluster_replica_first();
 	if (replica == NULL) {
 		lua_pushstring(L, "status");
 		lua_pushstring(L, "off");
diff --git a/src/box/replica.h b/src/box/replica.h
index 77962226eb..631ce44fac 100644
--- a/src/box/replica.h
+++ b/src/box/replica.h
@@ -37,6 +37,8 @@
 #include "trivia/util.h"
 #include "uri.h"
 #include "third_party/tarantool_ev.h"
+#define RB_COMPACT 1
+#include <third_party/rb.h>
 
 struct recovery_state;
 
@@ -65,6 +67,7 @@ struct replica {
 	ev_tstamp lag, last_row_time;
 	bool warning_said;
 	char source[REPLICA_SOURCE_MAXLEN];
+	rb_node(struct replica) link; /* a set by source in cluster.cc */
 	struct uri uri;
 	union {
 		struct sockaddr addr;
-- 
GitLab