diff --git a/changelogs/unreleased/gh-8509-bootstrap-strategy-supervised.md b/changelogs/unreleased/gh-8509-bootstrap-strategy-supervised.md
new file mode 100644
index 0000000000000000000000000000000000000000..931c352b9a020fc6261e07d45cc9b65b52ceb470
--- /dev/null
+++ b/changelogs/unreleased/gh-8509-bootstrap-strategy-supervised.md
@@ -0,0 +1,18 @@
+## feature/replication
+
+* You may now control which node new replicas choose as a bootstrap leader
+  without touching node config. To do so, set `box.cfg.bootstrap_strategy` to
+  `'supervised'`, and the nodes will only bootstrap off the node on which you
+  called `box.ctl.make_bootstrap_leader()` last.
+  This works on an empty replica set bootstrap as well: start the admin console
+  before configuring the nodes. Then configure the nodes:
+  ```lua
+  box.cfg{
+      bootstrap_strategy = 'supervised',
+      replication = ...,
+      listen = ...,
+  }
+  ```
+  Finally, call `box.ctl.make_bootstrap_leader()` through the admin console
+  on the node you want to promote. All the nodes will bootstrap off that node
+  (gh-8509).
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 62f9ecc68fb77318203be1a14868c40075d97036..c81f35462a650f858f52c5fff505c22bc769e326 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3878,6 +3878,78 @@ on_replace_dd_priv(struct trigger * /* trigger */, void *event)
 
 /* {{{ cluster configuration */
 
+/**
+ * This trigger implements the "last write wins" strategy for
+ * "bootstrap_leader_uuid" tuple of space _schema. Comparison is performed by
+ * a timestamp and replica_id of the node which authored the change.
+ */
+static int
+before_replace_dd_schema(struct trigger * /* trigger */, void *event)
+{
+	struct txn *txn = (struct txn *)event;
+	struct txn_stmt *stmt = txn_current_stmt(txn);
+	struct tuple *old_tuple = stmt->old_tuple;
+	struct tuple *new_tuple = stmt->new_tuple;
+	const char *key = tuple_field_cstr(new_tuple != NULL ?
+					   new_tuple : old_tuple,
+					   BOX_SCHEMA_FIELD_KEY);
+	if (key == NULL)
+		return -1;
+	if (strcmp(key, "bootstrap_leader_uuid") == 0) {
+		uint64_t old_ts = 0;
+		uint32_t old_id = 0;
+		uint64_t new_ts = UINT64_MAX;
+		uint32_t new_id = UINT32_MAX;
+		int ts_field = BOX_SCHEMA_FIELD_VALUE + 1;
+		int id_field = BOX_SCHEMA_FIELD_VALUE + 2;
+		/*
+		 * Assume anything can be stored in old_tuple, so do not require
+		 * it to have a timestamp or replica_id. In contrary to that,
+		 * always require new tuple to have a valid timestamp and
+		 * replica_id.
+		 */
+		if (old_tuple != NULL) {
+			const char *field = tuple_field(old_tuple, ts_field);
+			if (field != NULL && mp_typeof(*field) == MP_UINT)
+				old_ts = mp_decode_uint(&field);
+			field = tuple_field(old_tuple, id_field);
+			if (field != NULL && mp_typeof(*field) == MP_UINT)
+				old_id = mp_decode_uint(&field);
+		}
+		if (new_tuple != NULL &&
+		    (tuple_field_u64(new_tuple, ts_field, &new_ts) != 0 ||
+		     tuple_field_u32(new_tuple, id_field, &new_id) != 0)) {
+			return -1;
+		}
+		if (new_ts < old_ts || (new_ts == old_ts && new_id < old_id)) {
+			say_info("Ignore the replace of tuple %s with %s in "
+				 "space _schema: the former has a newer "
+				 "timestamp", tuple_str(old_tuple),
+				 tuple_str(new_tuple));
+			goto return_old;
+		}
+	}
+	return 0;
+return_old:
+	if (new_tuple != NULL)
+		tuple_unref(new_tuple);
+	if (old_tuple != NULL)
+		tuple_ref(old_tuple);
+	stmt->new_tuple = old_tuple;
+	return 0;
+}
+
+/** An on_commit trigger to update bootstrap leader uuid. */
+static int
+on_commit_schema_set_bootstrap_leader_uuid(struct trigger *trigger, void *event)
+{
+	(void)event;
+	struct tt_uuid *uuid = (struct tt_uuid *)trigger->data;
+	bootstrap_leader_uuid = *uuid;
+	box_broadcast_ballot();
+	return 0;
+}
+
 /**
  * This trigger is invoked only upon initial recovery, when
  * reading contents of the system spaces from the snapshot.
@@ -3895,7 +3967,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
 	struct tuple *old_tuple = stmt->old_tuple;
 	struct tuple *new_tuple = stmt->new_tuple;
 	const char *key = tuple_field_cstr(new_tuple ? new_tuple : old_tuple,
-					      BOX_SCHEMA_FIELD_KEY);
+					   BOX_SCHEMA_FIELD_KEY);
 	if (key == NULL)
 		return -1;
 	if (strcmp(key, "cluster") == 0) {
@@ -3929,6 +4001,20 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
 			 */
 			dd_version_id = tarantool_version_id();
 		}
+	} else if (strcmp(key, "bootstrap_leader_uuid") == 0) {
+		struct tt_uuid *uuid = xregion_alloc_object(&txn->region,
+							    typeof(*uuid));
+		if (!new_tuple) {
+			*uuid = uuid_nil;
+		} else if (tuple_field_uuid(new_tuple, BOX_SCHEMA_FIELD_VALUE,
+					    uuid) != 0) {
+			return -1;
+		}
+		struct trigger *on_commit = txn_alter_trigger_new(
+			on_commit_schema_set_bootstrap_leader_uuid, uuid);
+		if (on_commit == NULL)
+			return -1;
+		txn_on_commit(txn, on_commit);
 	}
 	return 0;
 }
@@ -4863,6 +4949,7 @@ TRIGGER(alter_space_on_replace_space, on_replace_dd_space);
 TRIGGER(alter_space_on_replace_index, on_replace_dd_index);
 TRIGGER(on_replace_truncate, on_replace_dd_truncate);
 TRIGGER(on_replace_schema, on_replace_dd_schema);
+TRIGGER(before_replace_schema, before_replace_dd_schema);
 TRIGGER(on_replace_user, on_replace_dd_user);
 TRIGGER(on_replace_func, on_replace_dd_func);
 TRIGGER(on_replace_collation, on_replace_dd_collation);
diff --git a/src/box/alter.h b/src/box/alter.h
index 592a91fe9dea68c7eb55129e251928dd7d224cf9..a783c79b9dd758ddde622aae4eb3fa8304e8f705 100644
--- a/src/box/alter.h
+++ b/src/box/alter.h
@@ -32,6 +32,8 @@
  */
 #include "trigger.h"
 
+extern struct trigger before_replace_schema;
+
 extern struct trigger alter_space_on_replace_space;
 extern struct trigger alter_space_on_replace_index;
 extern struct trigger on_replace_truncate;
diff --git a/src/box/box.cc b/src/box/box.cc
index 48f025d895469b030e4e33e61f2d1dddf1c211b9..5d9d1a1f0867a7b7068edb5d48dff22650afd6df 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -114,12 +114,7 @@ const char *box_auth_type;
 
 const char *box_ballot_event_key = "internal.ballot";
 
-/**
- * UUID of the node this instance bootstrapped from.
- * Is only set during the bootstrap and left untouched during automatic
- * rebootstrap and anonymous replica register.
- */
-static struct tt_uuid bootstrap_leader_uuid;
+struct tt_uuid bootstrap_leader_uuid;
 
 /**
  * Set if backup is in progress, i.e. box_backup_start() was
@@ -1082,9 +1077,11 @@ box_check_bootstrap_strategy(void)
 		return BOOTSTRAP_STRATEGY_LEGACY;
 	if (strcmp(strategy, "config") == 0)
 		return BOOTSTRAP_STRATEGY_CONFIG;
+	if (strcmp(strategy, "supervised") == 0)
+		return BOOTSTRAP_STRATEGY_SUPERVISED;
 	diag_set(ClientError, ER_CFG, "bootstrap_strategy",
 		 "the value should be one of the following: "
-		 "'auto', 'config', 'legacy'");
+		 "'auto', 'config', 'supervised', 'legacy'");
 	return BOOTSTRAP_STRATEGY_INVALID;
 }
 
@@ -1838,6 +1835,48 @@ box_set_bootstrap_leader(void)
 					  &cfg_bootstrap_leader_uuid);
 }
 
+/** Persist this instance as the bootstrap leader in _schema space. */
+static int
+box_set_bootstrap_leader_record(void)
+{
+	assert(instance_id != REPLICA_ID_NIL);
+	assert(!tt_uuid_is_nil(&INSTANCE_UUID));
+	return boxk(IPROTO_REPLACE, BOX_SCHEMA_ID,
+		    "[%s%s%" PRIu64 "%" PRIu32 "]", "bootstrap_leader_uuid",
+		    tt_uuid_str(&INSTANCE_UUID), fiber_time64(), instance_id);
+}
+
+int
+box_make_bootstrap_leader(void)
+{
+	if (tt_uuid_is_nil(&INSTANCE_UUID)) {
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "box.ctl.make_bootstrap_leader()",
+			 "promoting this instance before box.cfg() is called");
+		return -1;
+	}
+	/* Bootstrap strategy is read by the time instance uuid is known. */
+	assert(bootstrap_strategy != BOOTSTRAP_STRATEGY_INVALID);
+	if (bootstrap_strategy != BOOTSTRAP_STRATEGY_SUPERVISED) {
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 tt_sprintf("bootstrap_strategy = '%s'",
+				    cfg_gets("bootstrap_strategy")),
+			 "promoting the bootstrap leader via "
+			 "box.ctl.make_bootstrap_leader()");
+		return -1;
+	}
+	if (is_box_configured) {
+		if (box_check_writable() != 0)
+			return -1;
+		/* Ballot broadcast will happen in an on_commit trigger. */
+		return box_set_bootstrap_leader_record();
+	} else {
+		bootstrap_leader_uuid = INSTANCE_UUID;
+		box_broadcast_ballot();
+		return 0;
+	}
+}
+
 void
 box_set_replication_sync_lag(void)
 {
@@ -4301,6 +4340,8 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
 
 	/* Set UUID of a new replica set */
 	box_set_replicaset_uuid(replicaset_uuid);
+	if (bootstrap_strategy == BOOTSTRAP_STRATEGY_SUPERVISED)
+		box_set_bootstrap_leader_record();
 
 	/* Enable WAL subsystem. */
 	if (wal_enable() != 0)
diff --git a/src/box/box.h b/src/box/box.h
index 4fa637fba10604f6a2614eccacadcc9784f5e06c..1f8f44c648b2143b84743dcea17a33e3f1609d62 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -91,6 +91,13 @@ extern double txn_timeout_default;
 /** "internal.ballot" built-in event key. */
 extern const char *box_ballot_event_key;
 
+/**
+ * UUID of the node this instance considers the bootstrap leader. Is broadcast
+ * to replicas via the ballot and influences the replica's choice of the
+ * bootstrap leader.
+ */
+extern struct tt_uuid bootstrap_leader_uuid;
+
 /*
  * Initialize box library
  * @throws C++ exception
@@ -368,6 +375,12 @@ box_iterator_position_unpack(const char *packed_pos,
 			     uint32_t key_part_count, int iterator,
 			     const char **pos, const char **pos_end);
 
+/**
+ * For bootstrap_strategy = "supervised", set this node as the bootstrap leader.
+ */
+int
+box_make_bootstrap_leader(void);
+
 /**
  * Select data, satisfying filters (key and iterator), and dump it to port.
  * If packed_pos is not NULL and *packed_pos is not NULL, selection begins
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index ad075fc7baa67f32ebf2af92afb42f5d2038e979..75cb4d3fbd299fdb7643b84a6a0053a7f8ba2805 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -117,6 +117,14 @@ lbox_ctl_demote(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_ctl_make_bootstrap_leader(struct lua_State *L)
+{
+	if (box_make_bootstrap_leader() != 0)
+		return luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_ctl_is_recovery_finished(struct lua_State *L)
 {
@@ -158,6 +166,7 @@ static const struct luaL_Reg lbox_ctl_lib[] = {
 	/* An old alias. */
 	{"clear_synchro_queue", lbox_ctl_promote},
 	{"demote", lbox_ctl_demote},
+	{"make_bootstrap_leader", lbox_ctl_make_bootstrap_leader},
 	{"is_recovery_finished", lbox_ctl_is_recovery_finished},
 	{"set_on_shutdown_timeout", lbox_ctl_set_on_shutdown_timeout},
 	{NULL, NULL}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index deb7da5800045f8b5295de7dcef1263bba17e6e1..ebd85be3802e84759c201f9a165b2303e0282bcd 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -142,6 +142,7 @@ replicaset_connect_quorum(int total)
 	case BOOTSTRAP_STRATEGY_AUTO:
 		return replicaset_connect_quorum_auto(total);
 	case BOOTSTRAP_STRATEGY_CONFIG:
+	case BOOTSTRAP_STRATEGY_SUPERVISED:
 		/*
 		 * During the replica set bootstrap and join we don't care about
 		 * connected node count, we only care about reaching the
@@ -178,6 +179,7 @@ replicaset_sync_quorum(void)
 			   replicaset.applier.total);
 	case BOOTSTRAP_STRATEGY_AUTO:
 	case BOOTSTRAP_STRATEGY_CONFIG:
+	case BOOTSTRAP_STRATEGY_SUPERVISED:
 		return replication_sync_quorum_auto;
 	default:
 		unreachable();
@@ -910,6 +912,7 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 	struct replicaset_connect_state *state = on_connect->state;
 	struct applier *applier = (struct applier *)event;
 
+	fiber_cond_signal(&state->wakeup);
 	if (on_connect->seen_state == applier->state)
 		return 0;
 	on_connect->seen_state = applier->state;
@@ -918,7 +921,6 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 	    applier->state != APPLIER_CONNECTED) {
 		return 0;
 	}
-	fiber_cond_signal(&state->wakeup);
 	applier_pause(applier);
 	return 0;
 }
@@ -927,15 +929,19 @@ applier_on_connect_f(struct trigger *trigger, void *event)
 static bool
 applier_is_bootstrap_leader(const struct applier *applier)
 {
-	if (bootstrap_strategy != BOOTSTRAP_STRATEGY_CONFIG)
-		return false;
-	if (!tt_uuid_is_nil(&cfg_bootstrap_leader_uuid)) {
-		return tt_uuid_compare(&applier->uuid,
-				       &cfg_bootstrap_leader_uuid) == 0;
-	}
-	if (!uri_is_nil(&cfg_bootstrap_leader_uri)) {
-		return uri_addr_is_equal(&applier->uri,
-					 &cfg_bootstrap_leader_uri);
+	assert(!tt_uuid_is_nil(&applier->uuid));
+	if (bootstrap_strategy == BOOTSTRAP_STRATEGY_CONFIG) {
+		if (!tt_uuid_is_nil(&cfg_bootstrap_leader_uuid)) {
+			return tt_uuid_is_equal(&applier->uuid,
+						&cfg_bootstrap_leader_uuid);
+		}
+		if (!uri_is_nil(&cfg_bootstrap_leader_uri)) {
+			return uri_addr_is_equal(&applier->uri,
+						 &cfg_bootstrap_leader_uri);
+		}
+	} else if (bootstrap_strategy == BOOTSTRAP_STRATEGY_SUPERVISED) {
+		return tt_uuid_is_equal(&applier->ballot.bootstrap_leader_uuid,
+					&applier->uuid);
 	}
 	return false;
 }
@@ -965,6 +971,18 @@ replicaset_is_connected(struct replicaset_connect_state *state,
 {
 	if (replicaset_state == REPLICASET_BOOTSTRAP ||
 	    replicaset_state == REPLICASET_JOIN) {
+		/*
+		 * With "supervised" strategy we may continue only once the
+		 * bootstrap leader appears.
+		 */
+		if (bootstrap_strategy == BOOTSTRAP_STRATEGY_SUPERVISED)
+			return bootstrap_leader_is_connected(appliers, count);
+		/*
+		 * With "config" strategy we may continue either when the leader
+		 * appears or when there are no more connections to wait for.
+		 * If none of them is the configured bootstrap_leader, there's
+		 * no point to wait for one.
+		 */
 		if (bootstrap_strategy == BOOTSTRAP_STRATEGY_CONFIG &&
 		    bootstrap_leader_is_connected(appliers, count)) {
 			return true;
@@ -1421,6 +1439,28 @@ replicaset_find_join_master_cfg(void)
 	return leader;
 }
 
+/**
+ * Out of all the replicas find the one which was promoted via
+ * box.ctl.make_bootstrap_leader()
+ */
+static struct replica *
+replicaset_find_join_master_supervised(void)
+{
+	struct replica *leader = NULL;
+	replicaset_foreach(replica) {
+		struct applier *applier = replica->applier;
+		if (applier == NULL)
+			continue;
+		if (applier_is_bootstrap_leader(applier))
+			leader = replica;
+	}
+	if (leader == NULL) {
+		tnt_raise(ClientError, ER_CFG, "bootstrap_strategy",
+			  "failed to connect to the bootstrap leader");
+	}
+	return leader;
+}
+
 struct replica *
 replicaset_find_join_master(void)
 {
@@ -1430,6 +1470,8 @@ replicaset_find_join_master(void)
 		return replicaset_find_join_master_auto();
 	case BOOTSTRAP_STRATEGY_CONFIG:
 		return replicaset_find_join_master_cfg();
+	case BOOTSTRAP_STRATEGY_SUPERVISED:
+		return replicaset_find_join_master_supervised();
 	default:
 		unreachable();
 	}
diff --git a/src/box/replication.h b/src/box/replication.h
index 9f1014de78ca3c6be05496e926e3f69a88e2893f..333b34381b9a54586cb98e6969c777262cf8481f 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -105,6 +105,7 @@ enum bootstrap_strategy {
 	BOOTSTRAP_STRATEGY_AUTO,
 	BOOTSTRAP_STRATEGY_LEGACY,
 	BOOTSTRAP_STRATEGY_CONFIG,
+	BOOTSTRAP_STRATEGY_SUPERVISED,
 };
 
 /** Instance's bootstrap strategy. Controls replication reconfiguration. */
diff --git a/src/box/schema.cc b/src/box/schema.cc
index 94a487b8557456d67f56e778391d996738686089..bbd62834217603726663efb2ecb6635c3f9dd9c4 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -222,6 +222,9 @@ schema_init(void)
 	key_parts[0].type = FIELD_TYPE_STRING;
 	sc_space_new(BOX_SCHEMA_ID, "_schema", key_parts, 1,
 		     &on_replace_schema);
+	struct space *schema = space_by_id(BOX_SCHEMA_ID);
+	assert(schema != NULL);
+	trigger_add(&schema->before_replace, &before_replace_schema);
 
 	/* _collation - collation description. */
 	key_parts[0].fieldno = 0;
diff --git a/test/replication-luatest/bootstrap_strategy_test.lua b/test/replication-luatest/bootstrap_strategy_test.lua
index 2eedff913573ec00b167f5ee0e2cab7dab9ac340..53d455c380361cce538c40295e20dff360eecd3b 100644
--- a/test/replication-luatest/bootstrap_strategy_test.lua
+++ b/test/replication-luatest/bootstrap_strategy_test.lua
@@ -2,6 +2,8 @@ local t = require('luatest')
 local server = require('luatest.server')
 local replica_set = require('luatest.replica_set')
 local fio = require('fio')
+local fiber = require('fiber')
+local socket = require('socket')
 
 local g_auto = t.group('gh-5272-bootstrap-strategy-auto')
 
@@ -446,3 +448,250 @@ g_config_fail.test_bad_uri_or_uuid = function()
         })
     end
 end
+
+local g_supervised = t.group('gh-8509-bootstrap-strategy-supervised')
+
+local server2_admin
+local SOCKET_TIMEOUT = 5
+
+local function make_bootstrap_leader_initial(sockname)
+    local sock, err = socket.tcp_connect('unix/', sockname)
+    t.assert_equals(err, nil, 'Connection successful')
+    local greeting = sock:read(128, SOCKET_TIMEOUT)
+    t.assert_str_contains(greeting, 'Tarantool', 'Connected to console')
+    t.assert_str_contains(greeting, 'Lua console', 'Connected to console')
+    sock:write('box.ctl.make_bootstrap_leader()\n', SOCKET_TIMEOUT)
+    local response = sock:read(8, SOCKET_TIMEOUT)
+    t.assert_equals(response, '---\n...\n', 'The call succeeded')
+    sock:close()
+end
+
+g_supervised.after_each(function(cg)
+    cg.replica_set:drop()
+end)
+
+g_supervised.before_test('test_bootstrap', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.box_cfg = {
+        bootstrap_strategy = 'supervised',
+        replication = {
+            server.build_listen_uri('server1', cg.replica_set.id),
+            server.build_listen_uri('server2', cg.replica_set.id),
+        },
+        replication_timeout = 0.1,
+    }
+    for i = 1, 2 do
+        local alias = 'server' .. i
+        cg[alias] = cg.replica_set:build_and_add_server{
+            alias = alias,
+            box_cfg = cg.box_cfg,
+        }
+    end
+    cg.server1.box_cfg.instance_uuid = uuid1
+    cg.server2.box_cfg.instance_uuid = uuid2
+    server2_admin = fio.pathjoin(cg.server2.workdir, 'server2.admin')
+    local run_before_cfg = string.format([[
+        local console = require('console')
+        console.listen('unix/:%s')
+    ]], server2_admin)
+
+    cg.server2.env.TARANTOOL_RUN_BEFORE_BOX_CFG = run_before_cfg
+end)
+
+g_supervised.test_bootstrap = function(cg)
+    cg.replica_set:start{wait_until_ready = false}
+    t.helpers.retrying({}, make_bootstrap_leader_initial, server2_admin)
+    cg.server1:wait_until_ready()
+    cg.server2:wait_until_ready()
+    t.assert_equals(cg.server2:get_instance_id(), 1,
+                    'Server 2 is the bootstrap leader');
+    cg.server2:exec(function()
+        local tup = box.space._schema:get{'bootstrap_leader_uuid'}
+        t.assert(tup ~= nil, 'Bootstrap leader uuid is persisted')
+        t.assert_equals(tup[2], box.info.uuid,
+                        'Bootstrap leader uuid is correct')
+    end)
+    t.helpers.retrying({}, cg.server1.assert_follows_upstream, cg.server1, 1)
+
+    cg.server3 = cg.replica_set:build_and_add_server{
+        alias = 'server3',
+        box_cfg = cg.box_cfg,
+    }
+    cg.server3:start()
+    local query = string.format('bootstrapping replica from %s',
+                                uuid2:gsub('%-', '%%-'))
+    t.assert(cg.server3:grep_log(query), 'Server3 bootstrapped from server2')
+
+    cg.server1:exec(function()
+        box.ctl.make_bootstrap_leader()
+        local tup = box.space._schema:get{'bootstrap_leader_uuid'}
+        t.assert_equals(tup[2], box.info.uuid,
+                        'Bootstrap leader is updated')
+    end)
+    cg.server4 = cg.replica_set:build_and_add_server{
+        alias = 'server4',
+        box_cfg = cg.box_cfg,
+    }
+    cg.server4:start()
+    query = string.format('bootstrapping replica from %s',
+                          uuid1:gsub('%-', '%%-'))
+    t.assert(cg.server4:grep_log(query), 'Server4 bootstrapped from server1')
+
+end
+
+g_supervised.before_test('test_schema_triggers', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.server1 = cg.replica_set:build_and_add_server{alias = 'server1'}
+end)
+
+g_supervised.test_schema_triggers = function(cg)
+    cg.replica_set:start{}
+    cg.server1:exec(function()
+        local uuid2 = '22222222-2222-2222-2222-222222222222'
+        local uuid3 = '33333333-3333-3333-3333-333333333333'
+        box.cfg{bootstrap_strategy = 'supervised'}
+        box.ctl.make_bootstrap_leader()
+        local old_tuple = box.space._schema:get{'bootstrap_leader_uuid'}
+        local new_tuple = old_tuple:update{{'=', 2, 'not a uuid'}}
+        t.assert_error_msg_contains('Invalid UUID', function()
+            box.space._schema:replace(new_tuple)
+        end)
+        new_tuple = old_tuple:update{{'=', 3, 'not a timestamp'}}
+        t.assert_error_msg_contains('expected unsigned, got string', function()
+            box.space._schema:replace(new_tuple)
+        end)
+        new_tuple = old_tuple:update{{'=', 4, 'not a replica id'}}
+        t.assert_error_msg_contains('expected unsigned, got string', function()
+            box.space._schema:replace(new_tuple)
+        end)
+        new_tuple = old_tuple:update{{'-', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        old_tuple, 'Last write wins by timestamp - old tuple')
+        new_tuple = old_tuple:update{{'-', 4, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        old_tuple, 'Last write wins by replica id - old tuple')
+        new_tuple = old_tuple:update{{'+', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        new_tuple, 'Last write wins by timestamp - new tuple')
+        old_tuple = new_tuple
+        new_tuple = old_tuple:update{{'+', 4, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(box.space._schema:get{'bootstrap_leader_uuid'},
+                        new_tuple, 'Last write wins by replica id - new tuple')
+
+        local ballot_uuid
+        local watcher = box.watch('internal.ballot', function(_, ballot)
+                local ballot_key = box.iproto.key.BALLOT
+                local uuid_key = box.iproto.ballot_key.BOOTSTRAP_LEADER_UUID
+                ballot_uuid = ballot[ballot_key][uuid_key]
+        end)
+        t.helpers.retrying({}, function()
+            t.assert_equals(ballot_uuid, new_tuple[2],
+                            'Ballot stores correct uuid')
+        end)
+        old_tuple = new_tuple
+        new_tuple = old_tuple:update{{'=', 2, uuid2}, {'-', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.assert_equals(ballot_uuid, old_tuple[2],
+                        "Ballot isn't updated if the tuple is rejected")
+        new_tuple = new_tuple:update{{'+', 3, 1}}
+        box.space._schema:replace(new_tuple)
+        t.helpers.retrying({}, function()
+            t.assert_equals(ballot_uuid, new_tuple[2],
+                            'Ballot updates the uuid')
+        end)
+        old_tuple = new_tuple
+        new_tuple = new_tuple:update{{'=', 2, uuid3}}
+        box.begin()
+        box.space._schema:replace(new_tuple)
+        local new_uuid = ballot_uuid
+        box.commit()
+        t.assert_equals(new_uuid, old_tuple[2],
+                        "Ballot isn't updated before commit")
+        t.helpers.retrying({}, function()
+            t.assert_equals(ballot_uuid, new_tuple[2],
+                            "Ballot is updated on commit")
+        end)
+        watcher:unregister()
+    end)
+end
+
+local function assert_not_booted(server)
+    local logfile = fio.pathjoin(server.workdir, server.alias .. '.log')
+    t.helpers.retrying({}, function()
+        t.assert_equals(server:grep_log('ready to accept requests', nil,
+                                        {filename = logfile}), nil,
+                        server.alias .. 'does not bootstrap')
+    end)
+end
+
+g_supervised.before_test('test_wait_for_bootstrap_leader', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.box_cfg = {
+        bootstrap_strategy = 'supervised',
+        replication_timeout = 0.1,
+        replication = {
+            server.build_listen_uri('server1', cg.replica_set.id),
+            server.build_listen_uri('server2', cg.replica_set.id),
+        },
+        replication_connect_timeout = 1000,
+    }
+    for i = 1, 2 do
+        local alias = 'server' .. i
+        cg[alias] = cg.replica_set:build_and_add_server{
+            alias = alias,
+            box_cfg = cg.box_cfg,
+        }
+    end
+end)
+
+local function wait_master_is_seen(replica, master, rs_id)
+    local addr = server.build_listen_uri(master.alias, rs_id)
+    local logfile = fio.pathjoin(replica.workdir, replica.alias .. '.log')
+    local query = string.format('remote master .* at unix/:%s',
+                                addr:gsub('%-', '%%-'))
+    t.helpers.retrying({}, function()
+        t.assert(replica:grep_log(query, nil, {filename = logfile}),
+                 replica.alias .. ' sees ' .. addr)
+    end)
+end
+
+g_supervised.test_wait_for_bootstrap_leader = function(cg)
+    cg.replica_set:start{wait_until_ready = false}
+
+    wait_master_is_seen(cg.server1, cg.server1, cg.replica_set.id)
+    wait_master_is_seen(cg.server1, cg.server2, cg.replica_set.id)
+    wait_master_is_seen(cg.server2, cg.server1, cg.replica_set.id)
+    wait_master_is_seen(cg.server2, cg.server2, cg.replica_set.id)
+
+    fiber.sleep(cg.box_cfg.replication_timeout)
+
+    assert_not_booted(cg.server1)
+    assert_not_booted(cg.server2)
+end
+
+g_supervised.before_test('test_no_bootstrap_without_replication', function(cg)
+    cg.replica_set = replica_set:new{}
+    cg.server1 = cg.replica_set:build_and_add_server{
+        alias = 'server1',
+        box_cfg = {
+            bootstrap_strategy = 'supervised',
+        },
+    }
+end)
+
+g_supervised.test_no_bootstrap_without_replication = function(cg)
+    cg.server1:start{wait_until_ready = false}
+    local logfile = fio.pathjoin(cg.server1.workdir, 'server1.log')
+    local query = "can't initialize storage"
+    t.helpers.retrying({}, function()
+        t.assert(cg.server1:grep_log(query, nil, {filename = logfile}),
+                 'Server fails to boot')
+    end)
+    query = 'failed to connect to the bootstrap leader'
+    t.assert(cg.server1:grep_log(query, nil, {filename = logfile}),
+             'Bootstrap leader not found')
+end