From c419cab7d4d66d7fab5ea424d0807cd3bbbf0b51 Mon Sep 17 00:00:00 2001
From: Serge Petrenko <sergepetrenko@tarantool.org>
Date: Wed, 18 Jan 2023 18:01:40 +0300
Subject: [PATCH] replication: add bootstrap_strategy "supervised"

This commit adds another possible bootstrap_strategy to accompany "auto"
and "config": "supervised".

Such a strategy may be useful to pin the desired bootstrap leader on an
active cluster (so that the user may join the replicas from the desired
node without changing their box.cfg) or to manually set the bootstrap
leader among the nodes that managed to start without issues.

More details are in the docbot request.

Closes #8509

@TarantoolBot document
Title: new bootstrap strategy - "supervised"

The `bootstrap_strategy` configuration option may now be set to
"supervised".

This strategy works as follows:
When bootstrapping a new replicaset, the nodes do not choose a bootstrap
leader automatically and instead wait for it to be appointed by the
user. The configuration will fail if no bootstrap leader is appointed
during a `replication_connect_timeout`.

In order to apppoint a bootstrap leader, the user has to issue
`box.ctl.make_bootstrap_leader()` call on the desired node. This must be
done during the initial `box.cfg()` call.

Possible ways to achieve this are:

1. In interactive mode:
```lua
fiber.create(box.cfg, desired_config)
box.ctl.make_bootstrap_leader()
```
2. Via an init script:
```lua
-- Init script:
console.listen(admin_port)
box.cfg(desired_config)
-- User console:
tarantoolctl enter admin port
> box.ctl.make_bootstrap_leader()
```

When joining a new replica with `bootstrap_strategy` = "supervised" to
an existing replica set, the replica will not choose the bootstrap
leader automatically, but will instead join to the node on which
`box.ctl.make_bootstrap_leader()` was issued last. In case such a node
isn't found after a `replication_connect_timeout`, the configuration
fails.
---
 .../gh-8509-bootstrap-strategy-supervised.md  |  18 ++
 src/box/alter.cc                              |  89 ++++++-
 src/box/alter.h                               |   2 +
 src/box/box.cc                                |  55 +++-
 src/box/box.h                                 |  13 +
 src/box/lua/ctl.c                             |   9 +
 src/box/replication.cc                        |  62 ++++-
 src/box/replication.h                         |   1 +
 src/box/schema.cc                             |   3 +
 .../bootstrap_strategy_test.lua               | 249 ++++++++++++++++++
 10 files changed, 483 insertions(+), 18 deletions(-)
 create mode 100644 changelogs/unreleased/gh-8509-bootstrap-strategy-supervised.md

diff --git a/changelogs/unreleased/gh-8509-bootstrap-strategy-supervised.md b/changelogs/unreleased/gh-8509-bootstrap-strategy-supervised.md
new file mode 100644
index 0000000000..931c352b9a
--- /dev/null
+++ b/changelogs/unreleased/gh-8509-bootstrap-strategy-supervised.md
@@ -0,0 +1,18 @@
+## feature/replication
+
+* You may now control which node new replicas choose as a bootstrap leader
+  without touching node config. To do so, set `box.cfg.bootstrap_strategy` to
+  `'supervised'`, and the nodes will only bootstrap off the node on which you
+  called `box.ctl.make_bootstrap_leader()` last.
+  This works on an empty replica set bootstrap as well: start the admin console
+  before configuring the nodes. Then configure the nodes:
+  ```lua
+  box.cfg{
+      bootstrap_strategy = 'supervised',
+      replication = ...,
+      listen = ...,
+  }
+  ```
+  Finally, call `box.ctl.make_bootstrap_leader()` through the admin console
+  on the node you want to promote. All the nodes will bootstrap off that node
+  (gh-8509).
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 62f9ecc68f..c81f35462a 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3878,6 +3878,78 @@ on_replace_dd_priv(struct trigger * /* trigger */, void *event)
 
 /* {{{ cluster configuration */
 
+/**
+ * This trigger implements the "last write wins" strategy for
+ * "bootstrap_leader_uuid" tuple of space _schema. Comparison is performed by
+ * a timestamp and replica_id of the node which authored the change.
+ */
+static int
+before_replace_dd_schema(struct trigger * /* trigger */, void *event)
+{
+	struct txn *txn = (struct txn *)event;
+	struct txn_stmt *stmt = txn_current_stmt(txn);
+	struct tuple *old_tuple = stmt->old_tuple;
+	struct tuple *new_tuple = stmt->new_tuple;
+	const char *key = tuple_field_cstr(new_tuple != NULL ?
+					   new_tuple : old_tuple,
+					   BOX_SCHEMA_FIELD_KEY);
+	if (key == NULL)
+		return -1;
+	if (strcmp(key, "bootstrap_leader_uuid") == 0) {
+		uint64_t old_ts = 0;
+		uint32_t old_id = 0;
+		uint64_t new_ts = UINT64_MAX;
+		uint32_t new_id = UINT32_MAX;
+		int ts_field = BOX_SCHEMA_FIELD_VALUE + 1;
+		int id_field = BOX_SCHEMA_FIELD_VALUE + 2;
+		/*
+		 * Assume anything can be stored in old_tuple, so do not require
+		 * it to have a timestamp or replica_id. In contrary to that,
+		 * always require new tuple to have a valid timestamp and
+		 * replica_id.
+		 */
+		if (old_tuple != NULL) {
+			const char *field = tuple_field(old_tuple, ts_field);
+			if (field != NULL && mp_typeof(*field) == MP_UINT)
+				old_ts = mp_decode_uint(&field);
+			field = tuple_field(old_tuple, id_field);
+			if (field != NULL && mp_typeof(*field) == MP_UINT)
+				old_id = mp_decode_uint(&field);
+		}
+		if (new_tuple != NULL &&
+		    (tuple_field_u64(new_tuple, ts_field, &new_ts) != 0 ||
+		     tuple_field_u32(new_tuple, id_field, &new_id) != 0)) {
+			return -1;
+		}
+		if (new_ts < old_ts || (new_ts == old_ts && new_id < old_id)) {
+			say_info("Ignore the replace of tuple %s with %s in "
+				 "space _schema: the former has a newer "
+				 "timestamp", tuple_str(old_tuple),
+				 tuple_str(new_tuple));
+			goto return_old;
+		}
+	}
+	return 0;
+return_old:
+	if (new_tuple != NULL)
+		tuple_unref(new_tuple);
+	if (old_tuple != NULL)
+		tuple_ref(old_tuple);
+	stmt->new_tuple = old_tuple;
+	return 0;
+}
+
+/** An on_commit trigger to update bootstrap leader uuid. */
+static int
+on_commit_schema_set_bootstrap_leader_uuid(struct trigger *trigger, void *event)
+{
+	(void)event;
+	struct tt_uuid *uuid = (struct tt_uuid *)trigger->data;
+	bootstrap_leader_uuid = *uuid;
+	box_broadcast_ballot();
+	return 0;
+}
+
 /**
  * This trigger is invoked only upon initial recovery, when
  * reading contents of the system spaces from the snapshot.
@@ -3895,7 +3967,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
 	struct tuple *old_tuple = stmt->old_tuple;
 	struct tuple *new_tuple = stmt->new_tuple;
 	const char *key = tuple_field_cstr(new_tuple ? new_tuple : old_tuple,
-					      BOX_SCHEMA_FIELD_KEY);
+					   BOX_SCHEMA_FIELD_KEY);
 	if (key == NULL)
 		return -1;
 	if (strcmp(key, "cluster") == 0) {
@@ -3929,6 +4001,20 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
 			 */
 			dd_version_id = tarantool_version_id();
 		}
+	} else if (strcmp(key, "bootstrap_leader_uuid") == 0) {
+		struct tt_uuid *uuid = xregion_alloc_object(&txn->region,
+							    typeof(*uuid));
+		if (!new_tuple) {
+			*uuid = uuid_nil;
+		} else if (tuple_field_uuid(new_tuple, BOX_SCHEMA_FIELD_VALUE,
+					    uuid) != 0) {
+			return -1;
+		}
+		struct trigger *on_commit = txn_alter_trigger_new(
+			on_commit_schema_set_bootstrap_leader_uuid, uuid);
+		if (on_commit == NULL)
+			return -1;
+		txn_on_commit(txn, on_commit);
 	}
 	return 0;
 }
@@ -4863,6 +4949,7 @@ TRIGGER(alter_space_on_replace_space, on_replace_dd_space);
 TRIGGER(alter_space_on_replace_index, on_replace_dd_index);
 TRIGGER(on_replace_truncate, on_replace_dd_truncate);
 TRIGGER(on_replace_schema, on_replace_dd_schema);
+TRIGGER(before_replace_schema, before_replace_dd_schema);
 TRIGGER(on_replace_user, on_replace_dd_user);
 TRIGGER(on_replace_func, on_replace_dd_func);
 TRIGGER(on_replace_collation, on_replace_dd_collation);
diff --git a/src/box/alter.h b/src/box/alter.h
index 592a91fe9d..a783c79b9d 100644
--- a/src/box/alter.h
+++ b/src/box/alter.h
@@ -32,6 +32,8 @@
  */
 #include "trigger.h"
 
+extern struct trigger before_replace_schema;
+
 extern struct trigger alter_space_on_replace_space;
 extern struct trigger alter_space_on_replace_index;
 extern struct trigger on_replace_truncate;
diff --git a/src/box/box.cc b/src/box/box.cc
index 48f025d895..5d9d1a1f08 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -114,12 +114,7 @@ const char *box_auth_type;
 
 const char *box_ballot_event_key = "internal.ballot";
 
-/**
- * UUID of the node this instance bootstrapped from.
- * Is only set during the bootstrap and left untouched during automatic
- * rebootstrap and anonymous replica register.
- */
-static struct tt_uuid bootstrap_leader_uuid;
+struct tt_uuid bootstrap_leader_uuid;
 
 /**
  * Set if backup is in progress, i.e. box_backup_start() was
@@ -1082,9 +1077,11 @@ box_check_bootstrap_strategy(void)
 		return BOOTSTRAP_STRATEGY_LEGACY;
 	if (strcmp(strategy, "config") == 0)
 		return BOOTSTRAP_STRATEGY_CONFIG;
+	if (strcmp(strategy, "supervised") == 0)
+		return BOOTSTRAP_STRATEGY_SUPERVISED;
 	diag_set(ClientError, ER_CFG, "bootstrap_strategy",
 		 "the value should be one of the following: "
-		 "'auto', 'config', 'legacy'");
+		 "'auto', 'config', 'supervised', 'legacy'");
 	return BOOTSTRAP_STRATEGY_INVALID;
 }
 
@@ -1838,6 +1835,48 @@ box_set_bootstrap_leader(void)
 					  &cfg_bootstrap_leader_uuid);
 }
 
+/** Persist this instance as the bootstrap leader in _schema space. */
+static int
+box_set_bootstrap_leader_record(void)
+{
+	assert(instance_id != REPLICA_ID_NIL);
+	assert(!tt_uuid_is_nil(&INSTANCE_UUID));
+	return boxk(IPROTO_REPLACE, BOX_SCHEMA_ID,
+		    "[%s%s%" PRIu64 "%" PRIu32 "]", "bootstrap_leader_uuid",
+		    tt_uuid_str(&INSTANCE_UUID), fiber_time64(), instance_id);
+}
+
+int
+box_make_bootstrap_leader(void)
+{
+	if (tt_uuid_is_nil(&INSTANCE_UUID)) {
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "box.ctl.make_bootstrap_leader()",
+			 "promoting this instance before box.cfg() is called");
+		return -1;
+	}
+	/* Bootstrap strategy is read by the time instance uuid is known. */
+	assert(bootstrap_strategy != BOOTSTRAP_STRATEGY_INVALID);
+	if (bootstrap_strategy != BOOTSTRAP_STRATEGY_SUPERVISED) {
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 tt_sprintf("bootstrap_strategy = '%s'",
+				    cfg_gets("bootstrap_strategy")),
+			 "promoting the bootstrap leader via "
+			 "box.ctl.make_bootstrap_leader()");
+		return -1;
+	}
+	if (is_box_configured) {
+		if (box_check_writable() != 0)
+			return -1;
+		/* Ballot broadcast will happen in an on_commit trigger. */
+		return box_set_bootstrap_leader_record();
+	} else {
+		bootstrap_leader_uuid = INSTANCE_UUID;
+		box_broadcast_ballot();
+		return 0;
+	}
+}
+
 void
 box_set_replication_sync_lag(void)
 {
@@ -4301,6 +4340,8 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
 
 	/* Set UUID of a new replica set */
 	box_set_replicaset_uuid(replicaset_uuid);
+	if (bootstrap_strategy == BOOTSTRAP_STRATEGY_SUPERVISED)
+		box_set_bootstrap_leader_record();
 
 	/* Enable WAL subsystem. */
 	if (wal_enable() != 0)
diff --git a/src/box/box.h b/src/box/box.h
index 4fa637fba1..1f8f44c648 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -91,6 +91,13 @@ extern double txn_timeout_default;
 /** "internal.ballot" built-in event key. */
 extern const char *box_ballot_event_key;
 
+/**
+ * UUID of the node this instance considers the bootstrap leader. Is broadcast
+ * to replicas via the ballot and influences the replica's choice of the
+ * bootstrap leader.
+ */
+extern struct tt_uuid bootstrap_leader_uuid;
+
 /*
  * Initialize box library
  * @throws C++ exception
@@ -368,6 +375,12 @@ box_iterator_position_unpack(const char *packed_pos,
 			     uint32_t key_part_count, int iterator,
 			     const char **pos, const char **pos_end);
 
+/**
+ * For bootstrap_strategy = "supervised", set this node as the bootstrap leader.
+ */
+int
+box_make_bootstrap_leader(void);
+
 /**
  * Select data, satisfying filters (key and iterator), and dump it to port.
  * If packed_pos is not NULL and *packed_pos is not NULL, selection begins
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index ad075fc7ba..75cb4d3fbd 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -117,6 +117,14 @@ lbox_ctl_demote(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_ctl_make_bootstrap_leader(struct lua_State *L)
+{
+	if (box_make_bootstrap_leader() != 0)
+		return luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_ctl_is_recovery_finished(struct lua_State *L)
 {
@@ -158,6 +166,7 @@ static const struct luaL_Reg lbox_ctl_lib[] = {
 	/* An old alias. */
 	{"clear_synchro_queue", lbox_ctl_promote},
 	{"demote", lbox_ctl_demote},
+	{"make_bootstrap_leader", lbox_ctl_make_bootstrap_leader},
 	{"is_recovery_finished", lbox_ctl_is_recovery_finished},
 	{"set_on_shutdown_timeout", lbox_ctl_set_on_shutdown_timeout},
 	{NULL, NULL}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index deb7da5800..ebd85be380 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -142,6 +142,7 @@ replicaset_connect_quorum(int total)
 	case BOOTSTRAP_STRATEGY_AUTO:
 		return replicaset_connect_quorum_auto(total);
 	case BOOTSTRAP_STRATEGY_CONFIG:
+	case BOOTSTRAP_STRATEGY_SUPERVISED:
 		/*
 		 * During the replica set bootstrap and join we don't care about
 		 * connected node count, we only care about reaching the
@@ -178,6 +179,7 @@ replicaset_sync_quorum(void)
 			   replicaset.applier.total);
 	case BOOTSTRAP_STRATEGY_AUTO:
 	case BOOTSTRAP_STRATEGY_CONFIG:
+	case BOOTSTRAP_STRATEGY_SUPERVISED:
 		return replication_sync_quorum_auto;
 	default:
 		unreachable();
@@ -910,6 +912,7 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 	struct replicaset_connect_state *state = on_connect->state;
 	struct applier *applier = (struct applier *)event;
 
+	fiber_cond_signal(&state->wakeup);
 	if (on_connect->seen_state == applier->state)
 		return 0;
 	on_connect->seen_state = applier->state;
@@ -918,7 +921,6 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 	    applier->state != APPLIER_CONNECTED) {
 		return 0;
 	}
-	fiber_cond_signal(&state->wakeup);
 	applier_pause(applier);
 	return 0;
 }
@@ -927,15 +929,19 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 static bool
 applier_is_bootstrap_leader(const struct applier *applier)
 {
-	if (bootstrap_strategy != BOOTSTRAP_STRATEGY_CONFIG)
-		return false;
-	if (!tt_uuid_is_nil(&cfg_bootstrap_leader_uuid)) {
-		return tt_uuid_compare(&applier->uuid,
-				       &cfg_bootstrap_leader_uuid) == 0;
-	}
-	if (!uri_is_nil(&cfg_bootstrap_leader_uri)) {
-		return uri_addr_is_equal(&applier->uri,
-					 &cfg_bootstrap_leader_uri);
+	assert(!tt_uuid_is_nil(&applier->uuid));
+	if (bootstrap_strategy == BOOTSTRAP_STRATEGY_CONFIG) {
+		if (!tt_uuid_is_nil(&cfg_bootstrap_leader_uuid)) {
+			return tt_uuid_is_equal(&applier->uuid,
+						&cfg_bootstrap_leader_uuid);
+		}
+		if (!uri_is_nil(&cfg_bootstrap_leader_uri)) {
+			return uri_addr_is_equal(&applier->uri,
+						 &cfg_bootstrap_leader_uri);
+		}
+	} else if (bootstrap_strategy == BOOTSTRAP_STRATEGY_SUPERVISED) {
+		return tt_uuid_is_equal(&applier->ballot.bootstrap_leader_uuid,
+					&applier->uuid);
 	}
 	return false;
 }
@@ -965,6 +971,18 @@ replicaset_is_connected(struct replicaset_connect_state *state,
 {
 	if (replicaset_state == REPLICASET_BOOTSTRAP ||
 	    replicaset_state == REPLICASET_JOIN) {
+		/*
+		 * With "supervised" strategy we may continue only once the
+		 * bootstrap leader appears.
+		 */
+		if (bootstrap_strategy == BOOTSTRAP_STRATEGY_SUPERVISED)
+			return bootstrap_leader_is_connected(appliers, count);
+		/*
+		 * With "config" strategy we may continue either when the leader
+		 * appears or when there are no more connections to wait for.
+		 * If none of them is the configured bootstrap_leader, there's
+		 * no point to wait for one.
+		 */
 		if (bootstrap_strategy == BOOTSTRAP_STRATEGY_CONFIG &&
 		    bootstrap_leader_is_connected(appliers, count)) {
 			return true;
@@ -1421,6 +1439,28 @@ replicaset_find_join_master_cfg(void)
 	return leader;
 }
 
+/**
+ * Out of all the replicas find the one which was promoted via
+ * box.ctl.make_bootstrap_leader()
+ */
+static struct replica *
+replicaset_find_join_master_supervised(void)
+{
+	struct replica *leader = NULL;
+	replicaset_foreach(replica) {
+		struct applier *applier = replica->applier;
+		if (applier == NULL)
+			continue;
+		if (applier_is_bootstrap_leader(applier))
+			leader = replica;
+	}
+	if (leader == NULL) {
+		tnt_raise(ClientError, ER_CFG, "bootstrap_strategy",
+			  "failed to connect to the bootstrap leader");
+	}
+	return leader;
+}
+
 struct replica *
 replicaset_find_join_master(void)
 {
@@ -1430,6 +1470,8 @@ replicaset_find_join_master(void)
 		return replicaset_find_join_master_auto();
 	case BOOTSTRAP_STRATEGY_CONFIG:
 		return replicaset_find_join_master_cfg();
+	case BOOTSTRAP_STRATEGY_SUPERVISED:
+		return replicaset_find_join_master_supervised();
 	default:
 		unreachable();
 	}
diff --git a/src/box/replication.h b/src/box/replication.h
index 9f1014de78..333b34381b 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -105,6 +105,7 @@ enum bootstrap_strategy {
 	BOOTSTRAP_STRATEGY_AUTO,
 	BOOTSTRAP_STRATEGY_LEGACY,
 	BOOTSTRAP_STRATEGY_CONFIG,
+	BOOTSTRAP_STRATEGY_SUPERVISED,
 };
 
 /** Instance's bootstrap strategy. Controls replication reconfiguration. */
diff --git a/src/box/schema.cc b/src/box/schema.cc
index 94a487b855..bbd6283421 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -222,6 +222,9 @@ schema_init(void)
 	key_parts[0].type = FIELD_TYPE_STRING;
 	sc_space_new(BOX_SCHEMA_ID, "_schema", key_parts, 1,
 		     &on_replace_schema);
+	struct space *schema = space_by_id(BOX_SCHEMA_ID);
+	assert(schema != NULL);
+	trigger_add(&schema->before_replace, &before_replace_schema);
 
 	/* _collation - collation description. */
 	key_parts[0].fieldno = 0;
diff --git a/test/replication-luatest/bootstrap_strategy_test.lua b/test/replication-luatest/bootstrap_strategy_test.lua
index 2eedff9135..53d455c380 100644
--- a/test/replication-luatest/bootstrap_strategy_test.lua
+++ b/test/replication-luatest/bootstrap_strategy_test.lua
@@ -2,6 +2,8 @@ local t = require('luatest')
 local server = require('luatest.server')
 local replica_set = require('luatest.replica_set')
 local fio = require('fio')
+local fiber = require('fiber')
+local socket = require('socket')
 
 local g_auto = t.group('gh-5272-bootstrap-strategy-auto')
 
@@ -446,3 +448,250 @@ g_config_fail.test_bad_uri_or_uuid = function()
         })
     end
 end
+
+local g_supervised = t.group('gh-8509-bootstrap-strategy-supervised')
+
+local server2_admin
+local SOCKET_TIMEOUT = 5
+
+local function make_bootstrap_leader_initial(sockname)
+    local sock, err = socket.tcp_connect('unix/', sockname)
+    t.assert_equals(err, nil, 'Connection successful')
+    local greeting = sock:read(128, SOCKET_TIMEOUT)
+    t.assert_str_contains(greeting, 'Tarantool', 'Connected to console')
+    t.assert_str_contains(greeting, 'Lua console', 'Connected to console')
+    sock:write('box.ctl.make_bootstrap_leader()\n', SOCKET_TIMEOUT)
+    local response = sock:read(8, SOCKET_TIMEOUT)
+    t.assert_equals(response, '---\n...\n', 'The call succeeded')
+    sock:close()
+end
+
+g_supervised.after_each(function(cg)
+    cg.replica_set:drop()
+end)
+
+g_supervised.before_test('test_bootstrap', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.box_cfg = {
+        bootstrap_strategy = 'supervised',
+        replication = {
+            server.build_listen_uri('server1', cg.replica_set.id),
+            server.build_listen_uri('server2', cg.replica_set.id),
+        },
+        replication_timeout = 0.1,
+    }
+    for i = 1, 2 do
+        local alias = 'server' .. i
+        cg[alias] = cg.replica_set:build_and_add_server{
+            alias = alias,
+            box_cfg = cg.box_cfg,
+        }
+    end
+    cg.server1.box_cfg.instance_uuid = uuid1
+    cg.server2.box_cfg.instance_uuid = uuid2
+    server2_admin = fio.pathjoin(cg.server2.workdir, 'server2.admin')
+    local run_before_cfg = string.format([[
+        local console = require('console')
+        console.listen('unix/:%s')
+    ]], server2_admin)
+
+    cg.server2.env.TARANTOOL_RUN_BEFORE_BOX_CFG = run_before_cfg
+end)
+
+g_supervised.test_bootstrap = function(cg)
+    cg.replica_set:start{wait_until_ready = false}
+    t.helpers.retrying({}, make_bootstrap_leader_initial, server2_admin)
+    cg.server1:wait_until_ready()
+    cg.server2:wait_until_ready()
+    t.assert_equals(cg.server2:get_instance_id(), 1,
+                    'Server 2 is the bootstrap leader');
+    cg.server2:exec(function()
+        local tup = box.space._schema:get{'bootstrap_leader_uuid'}
+        t.assert(tup ~= nil, 'Bootstrap leader uuid is persisted')
+        t.assert_equals(tup[2], box.info.uuid,
+                        'Bootstrap leader uuid is correct')
+    end)
+    t.helpers.retrying({}, cg.server1.assert_follows_upstream, cg.server1, 1)
+
+    cg.server3 = cg.replica_set:build_and_add_server{
+        alias = 'server3',
+        box_cfg = cg.box_cfg,
+    }
+    cg.server3:start()
+    local query = string.format('bootstrapping replica from %s',
+                                uuid2:gsub('%-', '%%-'))
+    t.assert(cg.server3:grep_log(query), 'Server3 bootstrapped from server2')
+
+    cg.server1:exec(function()
+        box.ctl.make_bootstrap_leader()
+        local tup = box.space._schema:get{'bootstrap_leader_uuid'}
+        t.assert_equals(tup[2], box.info.uuid,
+                        'Bootstrap leader is updated')
+    end)
+    cg.server4 = cg.replica_set:build_and_add_server{
+        alias = 'server4',
+        box_cfg = cg.box_cfg,
+    }
+    cg.server4:start()
+    query = string.format('bootstrapping replica from %s',
+                          uuid1:gsub('%-', '%%-'))
+    t.assert(cg.server4:grep_log(query), 'Server4 bootstrapped from server1')
+
+end
+
+g_supervised.before_test('test_schema_triggers', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.server1 = cg.replica_set:build_and_add_server{alias = 'server1'}
+end)
+
+g_supervised.test_schema_triggers = function(cg)
+    cg.replica_set:start{}
+    cg.server1:exec(function()
+        local uuid2 = '22222222-2222-2222-2222-222222222222'
+        local uuid3 = '33333333-3333-3333-3333-333333333333'
+        box.cfg{bootstrap_strategy = 'supervised'}
+        box.ctl.make_bootstrap_leader()
+        local old_tuple = box.space._schema:get{'bootstrap_leader_uuid'}
+        local new_tuple = old_tuple:update{{'=', 2, 'not a uuid'}}
+        t.assert_error_msg_contains('Invalid UUID', function()
+            box.space._schema:replace(new_tuple)
+        end)
+        new_tuple = old_tuple:update{{'=', 3, 'not a timestamp'}}
+        t.assert_error_msg_contains('expected unsigned, got string', function()
+            box.space._schema:replace(new_tuple)
+        end)
+        new_tuple = old_tuple:update{{'=', 4, 'not a replica id'}}
+        t.assert_error_msg_contains('expected unsigned, got string', function()
+            box.space._schema:replace(new_tuple)
+        end)
+        new_tuple = old_tuple:update{{'-', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        old_tuple, 'Last write wins by timestamp - old tuple')
+        new_tuple = old_tuple:update{{'-', 4, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        old_tuple, 'Last write wins by replica id - old tuple')
+        new_tuple = old_tuple:update{{'+', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        new_tuple, 'Last write wins by timestamp - new tuple')
+        old_tuple = new_tuple
+        new_tuple = old_tuple:update{{'+', 4, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        new_tuple, 'Last write wins by replica id - new tuple')
+
+        local ballot_uuid
+        local watcher = box.watch('internal.ballot', function(_, ballot)
+                local ballot_key = box.iproto.key.BALLOT
+                local uuid_key = box.iproto.ballot_key.BOOTSTRAP_LEADER_UUID
+                ballot_uuid = ballot[ballot_key][uuid_key]
+        end)
+        t.helpers.retrying({}, function()
+            t.assert_equals(ballot_uuid, new_tuple[2],
+                            'Ballot stores correct uuid')
+        end)
+        old_tuple = new_tuple
+        new_tuple = old_tuple:update{{'=', 2, uuid2}, {'-', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(ballot_uuid, old_tuple[2],
+                        "Ballot isn't updated if the tuple is rejected")
+        new_tuple = new_tuple:update{{'+', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.helpers.retrying({}, function()
+            t.assert_equals(ballot_uuid, new_tuple[2],
+                            'Ballot updates the uuid')
+        end)
+        old_tuple = new_tuple
+        new_tuple = new_tuple:update{{'=', 2, uuid3}}
+        box.begin()
+        box.space._schema:replace(new_tuple)
+        local new_uuid = ballot_uuid
+        box.commit()
+        t.assert_equals(new_uuid, old_tuple[2],
+                        "Ballot isn't updated before commit")
+        t.helpers.retrying({}, function()
+            t.assert_equals(ballot_uuid, new_tuple[2],
+                            "Ballot is updated on commit")
+        end)
+        watcher:unregister()
+    end)
+end
+
+local function assert_not_booted(server)
+    local logfile = fio.pathjoin(server.workdir, server.alias .. '.log')
+    t.helpers.retrying({}, function()
+        t.assert_equals(server:grep_log('ready to accept requests', nil,
+                                        {filename = logfile}), nil,
+                        server.alias .. 'does not bootstrap')
+    end)
+end
+
+g_supervised.before_test('test_wait_for_bootstrap_leader', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.box_cfg = {
+        bootstrap_strategy = 'supervised',
+        replication_timeout = 0.1,
+        replication = {
+            server.build_listen_uri('server1', cg.replica_set.id),
+            server.build_listen_uri('server2', cg.replica_set.id),
+        },
+        replication_connect_timeout = 1000,
+    }
+    for i = 1, 2 do
+        local alias = 'server' .. i
+        cg[alias] = cg.replica_set:build_and_add_server{
+            alias = alias,
+            box_cfg = cg.box_cfg,
+        }
+    end
+end)
+
+local function wait_master_is_seen(replica, master, rs_id)
+    local addr = server.build_listen_uri(master.alias, rs_id)
+    local logfile = fio.pathjoin(replica.workdir, replica.alias .. '.log')
+    local query = string.format('remote master .* at unix/:%s',
+                                addr:gsub('%-', '%%-'))
+    t.helpers.retrying({}, function()
+        t.assert(replica:grep_log(query, nil, {filename = logfile}),
+                 replica.alias .. ' sees ' .. addr)
+    end)
+end
+
+g_supervised.test_wait_for_bootstrap_leader = function(cg)
+    cg.replica_set:start{wait_until_ready = false}
+
+    wait_master_is_seen(cg.server1, cg.server1, cg.replica_set.id)
+    wait_master_is_seen(cg.server1, cg.server2, cg.replica_set.id)
+    wait_master_is_seen(cg.server2, cg.server1, cg.replica_set.id)
+    wait_master_is_seen(cg.server2, cg.server2, cg.replica_set.id)
+
+    fiber.sleep(cg.box_cfg.replication_timeout)
+
+    assert_not_booted(cg.server1)
+    assert_not_booted(cg.server2)
+end
+
+g_supervised.before_test('test_no_bootstrap_without_replication', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.server1 = cg.replica_set:build_and_add_server{
+        alias = 'server1',
+        box_cfg = {
+            bootstrap_strategy = 'supervised',
+        },
+    }
+end)
+
+g_supervised.test_no_bootstrap_without_replication = function(cg)
+    cg.server1:start{wait_until_ready = false}
+    local logfile = fio.pathjoin(cg.server1.workdir, 'server1.log')
+    local query = "can't initialize storage"
+    t.helpers.retrying({}, function()
+        t.assert(cg.server1:grep_log(query, nil, {filename = logfile}),
+                 'Server fails to boot')
+    end)
+    query = 'failed to connect to the bootstrap leader'
+    t.assert(cg.server1:grep_log(query, nil, {filename = logfile}),
+             'Bootstrap leader not found')
+end
-- 
GitLab