diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index a467d3517e92ef0a6e7bce892b1daf65fdca7436..b5af1c7d821631fb6b98e00a162b6e75565fe29a 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -72,6 +72,7 @@ add_library(box STATIC
     memtx_engine.c
     memtx_space.c
     sysview.c
+    blackhole.c
     vinyl.c
     vy_stmt.c
     vy_mem.c
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 7b6bd1a5aca7ec7cd9483cb4f88d8ea6864f2895..0a17a0c3552fe18fa4c1fcb6ef21567fa86574ae 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -86,6 +86,13 @@ access_check_ddl(const char *name, uint32_t owner_uid,
 	user_access_t access = ((PRIV_U | (user_access_t) priv_type) &
 				~has_access);
 	bool is_owner = owner_uid == cr->uid || cr->uid == ADMIN;
+	if (access == 0)
+		return; /* Access granted. */
+	/* Check for specific entity access. */
+	struct access *object = entity_access_get(type);
+	if (object) {
+		access &= ~object[cr->auth_token].effective;
+	}
 	/*
 	 * Only the owner of the object or someone who has
 	 * specific DDL privilege on the object can execute
@@ -95,7 +102,7 @@ access_check_ddl(const char *name, uint32_t owner_uid,
 	 * the owner of the object, but this should be ignored --
 	 * CREATE privilege is required.
 	 */
-	if (access == 0 || (is_owner && !(access & (PRIV_U|PRIV_C))))
+	if (access == 0 || (is_owner && !(access & (PRIV_U | PRIV_C))))
 		return; /* Access granted. */
 
 	/* Create a meaningful error message. */
@@ -3126,7 +3133,7 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
 						      ER_CREATE_SEQUENCE);
 		assert(sequence_by_id(new_def->id) == NULL);
 		access_check_ddl(new_def->name, new_def->uid, SC_SEQUENCE,
-			PRIV_C, false);
+				 PRIV_C, false);
 		sequence_cache_replace(new_def);
 		alter->new_def = new_def;
 	} else if (old_tuple != NULL && new_tuple == NULL) {	/* DELETE */
@@ -3231,8 +3238,19 @@ on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event)
 		priv_type = PRIV_A;
 
 	/* Check we have the correct access type on the sequence.  * */
-	access_check_ddl(seq->def->name, seq->def->uid, SC_SEQUENCE, priv_type,
-			 false);
+	if (is_generated || !stmt->new_tuple) {
+		access_check_ddl(seq->def->name, seq->def->uid, SC_SEQUENCE,
+				 priv_type, false);
+	} else {
+		/*
+		 * In case user wants to attach an existing sequence,
+		 * check that it has read and write access.
+		 */
+		access_check_ddl(seq->def->name, seq->def->uid, SC_SEQUENCE,
+				 PRIV_R, false);
+		access_check_ddl(seq->def->name, seq->def->uid, SC_SEQUENCE,
+				 PRIV_W, false);
+	}
 	/** Check we have alter access on space. */
 	access_check_ddl(space->def->name, space->def->uid, SC_SPACE, PRIV_A,
 			 false);
diff --git a/src/box/blackhole.c b/src/box/blackhole.c
new file mode 100644
index 0000000000000000000000000000000000000000..aafbfbf6592fc726a01f421f4e7f5085804b2825
--- /dev/null
+++ b/src/box/blackhole.c
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "blackhole.h"
+
+#include <stddef.h>
+#include <stdlib.h>
+#include <small/rlist.h>
+
+#include "diag.h"
+#include "error.h"
+#include "errcode.h"
+#include "engine.h"
+#include "space.h"
+#include "txn.h"
+#include "tuple.h"
+#include "xrow.h"
+
+static void
+blackhole_space_destroy(struct space *space)
+{
+	free(space);
+}
+
+static int
+blackhole_space_execute_replace(struct space *space, struct txn *txn,
+				struct request *request, struct tuple **result)
+{
+	struct txn_stmt *stmt = txn_current_stmt(txn);
+	stmt->new_tuple = tuple_new(space->format, request->tuple,
+				    request->tuple_end);
+	if (stmt->new_tuple == NULL)
+		return -1;
+	tuple_ref(stmt->new_tuple);
+	*result = stmt->new_tuple;
+	return 0;
+}
+
+static int
+blackhole_space_execute_delete(struct space *space, struct txn *txn,
+			       struct request *request, struct tuple **result)
+{
+	(void)space;
+	(void)txn;
+	(void)request;
+	(void)result;
+	diag_set(ClientError, ER_UNSUPPORTED, "Blackhole", "delete()");
+	return -1;
+}
+
+static int
+blackhole_space_execute_update(struct space *space, struct txn *txn,
+			       struct request *request, struct tuple **result)
+{
+	(void)space;
+	(void)txn;
+	(void)request;
+	(void)result;
+	diag_set(ClientError, ER_UNSUPPORTED, "Blackhole", "update()");
+	return -1;
+}
+
+static int
+blackhole_space_execute_upsert(struct space *space, struct txn *txn,
+			       struct request *request)
+{
+	(void)space;
+	(void)txn;
+	(void)request;
+	diag_set(ClientError, ER_UNSUPPORTED, "Blackhole", "upsert()");
+	return -1;
+}
+
+static struct index *
+blackhole_space_create_index(struct space *space, struct index_def *def)
+{
+	(void)space;
+	(void)def;
+	/* See blackhole_engine_create_space(). */
+	unreachable();
+	return NULL;
+}
+
+static const struct space_vtab blackhole_space_vtab = {
+	/* .destroy = */ blackhole_space_destroy,
+	/* .bsize = */ generic_space_bsize,
+	/* .apply_initial_join_row = */ generic_space_apply_initial_join_row,
+	/* .execute_replace = */ blackhole_space_execute_replace,
+	/* .execute_delete = */ blackhole_space_execute_delete,
+	/* .execute_update = */ blackhole_space_execute_update,
+	/* .execute_upsert = */ blackhole_space_execute_upsert,
+	/* .ephemeral_replace = */ generic_space_ephemeral_replace,
+	/* .ephemeral_delete = */ generic_space_ephemeral_delete,
+	/* .init_system_space = */ generic_init_system_space,
+	/* .init_ephemeral_space = */ generic_init_ephemeral_space,
+	/* .check_index_def = */ generic_space_check_index_def,
+	/* .create_index = */ blackhole_space_create_index,
+	/* .add_primary_key = */ generic_space_add_primary_key,
+	/* .drop_primary_key = */ generic_space_drop_primary_key,
+	/* .check_format = */ generic_space_check_format,
+	/* .build_index = */ generic_space_build_index,
+	/* .swap_index = */ generic_space_swap_index,
+	/* .prepare_alter = */ generic_space_prepare_alter,
+};
+
+static void
+blackhole_engine_shutdown(struct engine *engine)
+{
+	free(engine);
+}
+
+static struct space *
+blackhole_engine_create_space(struct engine *engine, struct space_def *def,
+			      struct rlist *key_list)
+{
+	if (!rlist_empty(key_list)) {
+		diag_set(ClientError, ER_UNSUPPORTED, "Blackhole", "indexes");
+		return NULL;
+	}
+
+	struct space *space = (struct space *)calloc(1, sizeof(*space));
+	if (space == NULL) {
+		diag_set(OutOfMemory, sizeof(*space),
+			 "malloc", "struct space");
+		return NULL;
+	}
+
+	/* Allocate tuples on runtime arena, but check space format. */
+	struct tuple_format *format;
+	format = tuple_format_new(&tuple_format_runtime->vtab, NULL, 0, 0,
+				  def->fields, def->field_count, def->dict);
+	if (format == NULL) {
+		free(space);
+		return NULL;
+	}
+	format->exact_field_count = def->exact_field_count;
+	tuple_format_ref(format);
+
+	if (space_create(space, engine, &blackhole_space_vtab,
+			 def, key_list, format) != 0) {
+		tuple_format_unref(format);
+		free(space);
+		return NULL;
+	}
+	return space;
+}
+
+static const struct engine_vtab blackhole_engine_vtab = {
+	/* .shutdown = */ blackhole_engine_shutdown,
+	/* .create_space = */ blackhole_engine_create_space,
+	/* .join = */ generic_engine_join,
+	/* .begin = */ generic_engine_begin,
+	/* .begin_statement = */ generic_engine_begin_statement,
+	/* .prepare = */ generic_engine_prepare,
+	/* .commit = */ generic_engine_commit,
+	/* .rollback_statement = */ generic_engine_rollback_statement,
+	/* .rollback = */ generic_engine_rollback,
+	/* .bootstrap = */ generic_engine_bootstrap,
+	/* .begin_initial_recovery = */ generic_engine_begin_initial_recovery,
+	/* .begin_final_recovery = */ generic_engine_begin_final_recovery,
+	/* .end_recovery = */ generic_engine_end_recovery,
+	/* .begin_checkpoint = */ generic_engine_begin_checkpoint,
+	/* .wait_checkpoint = */ generic_engine_wait_checkpoint,
+	/* .commit_checkpoint = */ generic_engine_commit_checkpoint,
+	/* .abort_checkpoint = */ generic_engine_abort_checkpoint,
+	/* .collect_garbage = */ generic_engine_collect_garbage,
+	/* .backup = */ generic_engine_backup,
+	/* .memory_stat = */ generic_engine_memory_stat,
+	/* .reset_stat = */ generic_engine_reset_stat,
+	/* .check_space_def = */ generic_engine_check_space_def,
+};
+
+struct engine *
+blackhole_engine_new(void)
+{
+	struct engine *engine = calloc(1, sizeof(*engine));
+	if (engine == NULL) {
+		diag_set(OutOfMemory, sizeof(*engine),
+			 "malloc", "struct engine");
+		return NULL;
+	}
+
+	engine->vtab = &blackhole_engine_vtab;
+	engine->name = "blackhole";
+	engine->flags = ENGINE_BYPASS_TX;
+	return engine;
+}
diff --git a/src/box/blackhole.h b/src/box/blackhole.h
new file mode 100644
index 0000000000000000000000000000000000000000..5a78610d978c7b8181293a9e6babd140b7296849
--- /dev/null
+++ b/src/box/blackhole.h
@@ -0,0 +1,58 @@
+#ifndef TARANTOOL_BOX_BLACKHOLE_H_INCLUDED
+#define TARANTOOL_BOX_BLACKHOLE_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stddef.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct engine *
+blackhole_engine_new(void);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+
+#include "diag.h"
+
+static inline struct engine *
+blackhole_engine_new_xc(void)
+{
+	struct engine *engine = blackhole_engine_new();
+	if (engine == NULL)
+		diag_raise();
+	return engine;
+}
+
+#endif /* defined(__plusplus) */
+
+#endif /* TARANTOOL_BOX_BLACKHOLE_H_INCLUDED */
diff --git a/src/box/box.cc b/src/box/box.cc
index b6c22b081992b09b07314f670afb7b4eddb9247f..62fd05468a16e2db0a582e3fec5596d39630f297 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -51,6 +51,7 @@
 #include "engine.h"
 #include "memtx_engine.h"
 #include "sysview.h"
+#include "blackhole.h"
 #include "vinyl.h"
 #include "space.h"
 #include "index.h"
@@ -1661,6 +1662,9 @@ engine_init()
 	struct sysview_engine *sysview = sysview_engine_new_xc();
 	engine_register((struct engine *)sysview);
 
+	struct engine *blackhole = blackhole_engine_new_xc();
+	engine_register(blackhole);
+
 	struct vinyl_engine *vinyl;
 	vinyl = vinyl_engine_new_xc(cfg_gets("vinyl_dir"),
 				    cfg_geti64("vinyl_memory"),
@@ -1725,7 +1729,8 @@ bootstrap_from_master(struct replica *master)
 	applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
 	assert(applier->state == APPLIER_READY);
 
-	say_info("bootstrapping replica from %s",
+	say_info("bootstrapping replica from %s at %s",
+		 tt_uuid_str(&master->uuid),
 		 sio_strfaddr(&applier->addr, applier->addr_len));
 
 	/*
@@ -1823,6 +1828,9 @@ bootstrap(const struct tt_uuid *instance_uuid,
 /**
  * Recover the instance from the local directory.
  * Enter hot standby if the directory is locked.
+ * Invoke rebootstrap if the instance fell too much
+ * behind its peers in the replica set and needs
+ * to be rebootstrapped.
  */
 static void
 local_recovery(const struct tt_uuid *instance_uuid,
@@ -1858,6 +1866,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	if (wal_dir_lock >= 0) {
 		box_listen();
 		box_sync_replication(replication_connect_timeout, false);
+
+		struct replica *master;
+		if (replicaset_needs_rejoin(&master)) {
+			say_crit("replica is too old, initiating rebootstrap");
+			return bootstrap_from_master(master);
+		}
 	}
 
 	/*
diff --git a/src/box/engine.h b/src/box/engine.h
index e10eaec445850553048bb2eb5b7f3e351e547d07..5b96c744de8944a506f706517ecb38e24f02d35d 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -190,6 +190,16 @@ struct engine_vtab {
 	int (*check_space_def)(struct space_def *);
 };
 
+enum {
+	/**
+	 * If set, the engine will not participate in transaction
+	 * control. In particular, this means that any operations
+	 * done on this engine's spaces can mix in other engine's
+	 * transactions w/o throwing ER_CROSS_ENGINE_TRANSACTION.
+	 */
+	ENGINE_BYPASS_TX = 1 << 0,
+};
+
 struct engine {
 	/** Virtual function table. */
 	const struct engine_vtab *vtab;
@@ -197,6 +207,8 @@ struct engine {
 	const char *name;
 	/** Engine id. */
 	uint32_t id;
+	/** Engine flags. */
+	uint32_t flags;
 	/** Used for search for engine by name. */
 	struct rlist link;
 };
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840a5546eaad887573aa2c081aa1d6b0c7..05468f203ce973dcda180fcaf31ae3efcc06d80b 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -287,6 +287,9 @@ relay_final_join(struct replica *replica, int fd, uint64_t sync,
 	if (rc != 0)
 		diag_raise();
 
+	ERROR_INJECT(ERRINJ_RELAY_FINAL_JOIN,
+		     tnt_raise(ClientError, ER_INJECTION, "relay final join"));
+
 	ERROR_INJECT(ERRINJ_RELAY_FINAL_SLEEP, {
 		while (vclock_compare(stop_vclock, &replicaset.vclock) == 0)
 			fiber_sleep(0.001);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c4d6e6f2690072fc725489731fa9441237c32161..26bbbe32a860599a854ddc684bd54844378bfd33 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -41,6 +41,7 @@
 #include "error.h"
 #include "relay.h"
 #include "vclock.h" /* VCLOCK_MAX */
+#include "sio.h"
 
 uint32_t instance_id = REPLICA_ID_NIL;
 struct tt_uuid INSTANCE_UUID;
@@ -205,6 +206,19 @@ replica_clear_id(struct replica *replica)
 	 */
 	replicaset.replica_by_id[replica->id] = NULL;
 	replica->id = REPLICA_ID_NIL;
+	/*
+	 * The replica will never resubscribe so we don't need to keep
+	 * WALs for it anymore. Unregister it with the garbage collector
+	 * if the relay thread is stopped. In case the relay thread is
+	 * still running, it may need to access replica->gc so leave the
+	 * job to replica_on_relay_stop, which will be called as soon as
+	 * the relay thread exits.
+	 */
+	if (replica->gc != NULL &&
+	    relay_get_state(replica->relay) != RELAY_FOLLOW) {
+		gc_consumer_unregister(replica->gc);
+		replica->gc = NULL;
+	}
 	if (replica_is_orphan(replica)) {
 		replica_hash_remove(&replicaset.hash, replica);
 		replica_delete(replica);
@@ -625,6 +639,66 @@ replicaset_connect(struct applier **appliers, int count,
 		  "failed to connect to one or more replicas");
 }
 
+bool
+replicaset_needs_rejoin(struct replica **master)
+{
+	struct replica *leader = NULL;
+	replicaset_foreach(replica) {
+		struct applier *applier = replica->applier;
+		if (applier == NULL)
+			continue;
+
+		const struct ballot *ballot = &applier->ballot;
+		if (vclock_compare(&ballot->gc_vclock,
+				   &replicaset.vclock) <= 0) {
+			/*
+			 * There's at least one master that still stores
+			 * WALs needed by this instance. Proceed to local
+			 * recovery.
+			 */
+			return false;
+		}
+
+		const char *uuid_str = tt_uuid_str(&replica->uuid);
+		const char *addr_str = sio_strfaddr(&applier->addr,
+						applier->addr_len);
+		char *local_vclock_str = vclock_to_string(&replicaset.vclock);
+		char *remote_vclock_str = vclock_to_string(&ballot->vclock);
+		char *gc_vclock_str = vclock_to_string(&ballot->gc_vclock);
+
+		say_info("can't follow %s at %s: required %s available %s",
+			 uuid_str, addr_str, local_vclock_str, gc_vclock_str);
+
+		if (vclock_compare(&replicaset.vclock, &ballot->vclock) > 0) {
+			/*
+			 * Replica has some rows that are not present on
+			 * the master. Don't rebootstrap as we don't want
+			 * to lose any data.
+			 */
+			say_info("can't rebootstrap from %s at %s: "
+				 "replica has local rows: local %s remote %s",
+				 uuid_str, addr_str, local_vclock_str,
+				 remote_vclock_str);
+			goto next;
+		}
+
+		/* Prefer a master with the max vclock. */
+		if (leader == NULL ||
+		    vclock_sum(&ballot->vclock) >
+		    vclock_sum(&leader->applier->ballot.vclock))
+			leader = replica;
+next:
+		free(local_vclock_str);
+		free(remote_vclock_str);
+		free(gc_vclock_str);
+	}
+	if (leader == NULL)
+		return false;
+
+	*master = leader;
+	return true;
+}
+
 void
 replicaset_follow(void)
 {
@@ -700,6 +774,16 @@ replicaset_check_quorum(void)
 void
 replica_on_relay_stop(struct replica *replica)
 {
+	/*
+	 * If the replica was evicted from the cluster, we don't
+	 * need to keep WALs for it anymore. Unregister it with
+	 * the garbage collector then. See also replica_clear_id.
+	 */
+	assert(replica->gc != NULL);
+	if (replica->id == REPLICA_ID_NIL) {
+		gc_consumer_unregister(replica->gc);
+		replica->gc = NULL;
+	}
 	if (replica_is_orphan(replica)) {
 		replica_hash_remove(&replicaset.hash, replica);
 		replica_delete(replica);
diff --git a/src/box/replication.h b/src/box/replication.h
index fdf995c310ed250cfe8ca8fae5ffd944342d1dc8..e8b391af27c8298ae2f6e091cd35625f916448ba 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -359,6 +359,15 @@ void
 replicaset_connect(struct applier **appliers, int count,
 		   double timeout, bool connect_all);
 
+/**
+ * Check if the current instance fell too much behind its
+ * peers in the replica set and needs to be rebootstrapped.
+ * If it does, return true and set @master to the instance
+ * to use for rebootstrap, otherwise return false.
+ */
+bool
+replicaset_needs_rejoin(struct replica **master);
+
 /**
  * Resume all appliers registered with the replica set.
  */
diff --git a/src/box/space.c b/src/box/space.c
index e53f1598ca1eb93b4fabde861d685f8cadce6c56..dad47e6fc630f818defb4dad8c5d569bc29b9f40 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -285,23 +285,20 @@ static int
 space_before_replace(struct space *space, struct txn *txn,
 		     struct request *request)
 {
-	if (space->index_count == 0) {
-		/* Empty space, nothing to do. */
-		return 0;
-	}
-
 	struct region *gc = &fiber()->gc;
 	enum iproto_type type = request->type;
-	struct index *pk = space->index[0];
+	struct index *pk = space_index(space, 0);
 
-	const char *key;
-	uint32_t part_count;
-	struct index *index;
+	const char *key = NULL;
+	uint32_t part_count = 0;
+	struct index *index = NULL;
 
 	/*
 	 * Lookup the old tuple.
 	 */
-	if (type == IPROTO_UPDATE || type == IPROTO_DELETE) {
+	switch (type) {
+	case IPROTO_UPDATE:
+	case IPROTO_DELETE:
 		index = index_find_unique(space, request->index_id);
 		if (index == NULL)
 			return -1;
@@ -310,21 +307,27 @@ space_before_replace(struct space *space, struct txn *txn,
 		if (exact_key_validate(index->def->key_def,
 				       key, part_count) != 0)
 			return -1;
-	} else if (type == IPROTO_INSERT || type == IPROTO_REPLACE ||
-		   type == IPROTO_UPSERT) {
+		break;
+	case IPROTO_INSERT:
+	case IPROTO_REPLACE:
+	case IPROTO_UPSERT:
+		if (pk == NULL)
+			break;
 		index = pk;
 		key = tuple_extract_key_raw(request->tuple, request->tuple_end,
 					    index->def->key_def, NULL);
 		if (key == NULL)
 			return -1;
 		part_count = mp_decode_array(&key);
-	} else {
+		break;
+	default:
 		/* Unknown request type, nothing to do. */
 		return 0;
 	}
 
-	struct tuple *old_tuple;
-	if (index_get(index, key, part_count, &old_tuple) != 0)
+	struct tuple *old_tuple = NULL;
+	if (index != NULL &&
+	    index_get(index, key, part_count, &old_tuple) != 0)
 		return -1;
 
 	/*
@@ -435,7 +438,8 @@ space_before_replace(struct space *space, struct txn *txn,
 	 * We don't allow to change the value of the primary key
 	 * in the same statement.
 	 */
-	if (request_changed && old_tuple != NULL && new_tuple != NULL &&
+	if (pk != NULL && request_changed &&
+	    old_tuple != NULL && new_tuple != NULL &&
 	    tuple_compare(old_tuple, new_tuple, pk->def->key_def) != 0) {
 		diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY,
 			 pk->def->name, space->def->name);
diff --git a/src/box/sysview.c b/src/box/sysview.c
index a215e9ee1a29c6b3a2e12dc9bf7b6d8a2fa6e06b..ed5bca38efc65037169afd6afc08ea63187a59f3 100644
--- a/src/box/sysview.c
+++ b/src/box/sysview.c
@@ -560,5 +560,6 @@ sysview_engine_new(void)
 
 	sysview->base.vtab = &sysview_engine_vtab;
 	sysview->base.name = "sysview";
+	sysview->base.flags = ENGINE_BYPASS_TX;
 	return sysview;
 }
diff --git a/src/box/txn.c b/src/box/txn.c
index 11de1a3bee08d1d806da8524d02cbc8e508adb30..a35770cff8f07ff8f08354351f74e53434345e22 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -116,16 +116,15 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
 	stailq_cut_tail(&txn->stmts, svp, &rollback);
 	stailq_reverse(&rollback);
 	stailq_foreach_entry(stmt, &rollback, next) {
-		if (stmt->space != NULL) {
+		if (txn->engine != NULL && stmt->space != NULL)
 			engine_rollback_statement(txn->engine, txn, stmt);
-			stmt->space = NULL;
-		}
 		if (stmt->row != NULL) {
 			assert(txn->n_rows > 0);
 			txn->n_rows--;
-			stmt->row = NULL;
 		}
 		txn_stmt_unref_tuples(stmt);
+		stmt->space = NULL;
+		stmt->row = NULL;
 	}
 }
 
@@ -159,6 +158,8 @@ txn_begin(bool is_autocommit)
 int
 txn_begin_in_engine(struct engine *engine, struct txn *txn)
 {
+	if (engine->flags & ENGINE_BYPASS_TX)
+		return 0;
 	if (txn->engine == NULL) {
 		txn->engine = engine;
 		return engine_begin(engine, txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index 7781e8b0341535d1c92340eb1ccdfa73aa79bb9a..a9f68ed65062b4a2d126303a22c2444a10381794 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -271,7 +271,6 @@ txn_commit_ro_stmt(struct txn *txn)
 {
 	assert(txn == in_txn());
 	if (txn) {
-		assert(txn->engine);
 		/* nothing to do */
 	} else {
 		fiber_gc();
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index c20f8038f1e58aeee5d80d1015b1ee161a756129..3843cad6e538a61ecd0a1d89014087b8ff0df269 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -124,6 +124,8 @@ static const char *vy_log_type_name[] = {
 	[VY_LOG_MODIFY_LSM]		= "modify_lsm",
 	[VY_LOG_FORGET_LSM]		= "forget_lsm",
 	[VY_LOG_PREPARE_LSM]		= "prepare_lsm",
+	[VY_LOG_REBOOTSTRAP]		= "rebootstrap",
+	[VY_LOG_ABORT_REBOOTSTRAP]	= "abort_rebootstrap",
 };
 
 /** Metadata log object. */
@@ -184,6 +186,12 @@ static int
 vy_recovery_process_record(struct vy_recovery *recovery,
 			   const struct vy_log_record *record);
 
+static int
+vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
+
+int
+vy_log_rotate(const struct vclock *vclock);
+
 /**
  * Return the name of the vylog file that has the given signature.
  */
@@ -846,17 +854,43 @@ vy_log_next_id(void)
 	return vy_log.next_id++;
 }
 
+/**
+ * If a vylog file already exists, we are doing a rebootstrap:
+ * - Load the vylog to find out the id to start indexing new
+ *   objects with.
+ * - Mark the beginning of a new rebootstrap attempt by writing
+ *   VY_LOG_REBOOTSTRAP record.
+ */
+static int
+vy_log_rebootstrap(void)
+{
+	struct vy_recovery *recovery;
+	recovery = vy_recovery_new(vclock_sum(&vy_log.last_checkpoint),
+				   VY_RECOVERY_ABORT_REBOOTSTRAP);
+	if (recovery == NULL)
+		return -1;
+
+	vy_log.next_id = recovery->max_id + 1;
+	vy_recovery_delete(recovery);
+
+	struct vy_log_record record;
+	vy_log_record_init(&record);
+	record.type = VY_LOG_REBOOTSTRAP;
+	vy_log_tx_begin();
+	vy_log_write(&record);
+	if (vy_log_tx_commit() != 0)
+		return -1;
+
+	return 0;
+}
+
 int
 vy_log_bootstrap(void)
 {
-	/*
-	 * Scan the directory to make sure there is no
-	 * vylog files left from previous setups.
-	 */
 	if (xdir_scan(&vy_log.dir) < 0 && errno != ENOENT)
 		return -1;
-	if (xdir_last_vclock(&vy_log.dir, NULL) >= 0)
-		panic("vinyl directory is not empty");
+	if (xdir_last_vclock(&vy_log.dir, &vy_log.last_checkpoint) >= 0)
+		return vy_log_rebootstrap();
 
 	/* Add initial vclock to the xdir. */
 	struct vclock *vclock = malloc(sizeof(*vclock));
@@ -883,10 +917,11 @@ vy_log_begin_recovery(const struct vclock *vclock)
 	if (xdir_scan(&vy_log.dir) < 0 && errno != ENOENT)
 		return NULL;
 
-	struct vclock vy_log_vclock;
-	vclock_create(&vy_log_vclock);
-	if (xdir_last_vclock(&vy_log.dir, &vy_log_vclock) >= 0 &&
-	    vclock_compare(&vy_log_vclock, vclock) > 0) {
+	if (xdir_last_vclock(&vy_log.dir, &vy_log.last_checkpoint) < 0)
+		vclock_copy(&vy_log.last_checkpoint, vclock);
+
+	int cmp = vclock_compare(&vy_log.last_checkpoint, vclock);
+	if (cmp > 0) {
 		/*
 		 * Last vy_log log is newer than the last snapshot.
 		 * This can't normally happen, as vy_log is rotated
@@ -896,21 +931,45 @@ vy_log_begin_recovery(const struct vclock *vclock)
 		diag_set(ClientError, ER_MISSING_SNAPSHOT);
 		return NULL;
 	}
+	if (cmp < 0) {
+		/*
+		 * Last vy_log log is older than the last snapshot.
+		 * This happens if we are recovering from a backup.
+		 * Rotate the log to keep its signature in sync with
+		 * checkpoint.
+		 */
+		if (vy_log_rotate(vclock) != 0)
+			return NULL;
+	}
 
+	/*
+	 * If we are recovering from a vylog that has an unfinished
+	 * rebootstrap section, checkpoint (and hence rebootstrap)
+	 * failed, and we need to mark rebootstrap as aborted.
+	 */
 	struct vy_recovery *recovery;
-	recovery = vy_recovery_new(vclock_sum(&vy_log_vclock), 0);
+	recovery = vy_recovery_new(vclock_sum(&vy_log.last_checkpoint),
+				   VY_RECOVERY_ABORT_REBOOTSTRAP);
 	if (recovery == NULL)
 		return NULL;
 
+	if (recovery->in_rebootstrap) {
+		struct vy_log_record record;
+		vy_log_record_init(&record);
+		record.type = VY_LOG_ABORT_REBOOTSTRAP;
+		vy_log_tx_begin();
+		vy_log_write(&record);
+		if (vy_log_tx_commit() != 0) {
+			vy_recovery_delete(recovery);
+			return NULL;
+		}
+	}
+
 	vy_log.next_id = recovery->max_id + 1;
 	vy_log.recovery = recovery;
-	vclock_copy(&vy_log.last_checkpoint, vclock);
 	return recovery;
 }
 
-static int
-vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
-
 int
 vy_log_end_recovery(void)
 {
@@ -931,34 +990,6 @@ vy_log_end_recovery(void)
 		return -1;
 	}
 
-	/*
-	 * On backup we copy files corresponding to the most recent
-	 * checkpoint. Since vy_log does not create snapshots of its log
-	 * files, but instead appends records written after checkpoint
-	 * to the most recent log file, the signature of the vy_log file
-	 * corresponding to the last checkpoint equals the signature
-	 * of the previous checkpoint. So upon successful recovery
-	 * from a backup we need to rotate the log to keep checkpoint
-	 * and vy_log signatures in sync.
-	 */
-	struct vclock *vclock = vclockset_last(&vy_log.dir.index);
-	if (vclock == NULL ||
-	    vclock_compare(vclock, &vy_log.last_checkpoint) != 0) {
-		vclock = malloc(sizeof(*vclock));
-		if (vclock == NULL) {
-			diag_set(OutOfMemory, sizeof(*vclock),
-				 "malloc", "struct vclock");
-			return -1;
-		}
-		vclock_copy(vclock, &vy_log.last_checkpoint);
-		xdir_add_vclock(&vy_log.dir, vclock);
-		if (vy_log_create(vclock, vy_log.recovery) < 0) {
-			diag_log();
-			say_error("failed to write `%s'",
-				  vy_log_filename(vclock_sum(vclock)));
-			return -1;
-		}
-	}
 	xdir_collect_inprogress(&vy_log.dir);
 	vy_log.recovery = NULL;
 	return 0;
@@ -1307,6 +1338,7 @@ vy_recovery_do_create_lsm(struct vy_recovery *recovery, int64_t id,
 	 * before the final version.
 	 */
 	rlist_add_tail_entry(&recovery->lsms, lsm, in_recovery);
+	lsm->in_rebootstrap = recovery->in_rebootstrap;
 	if (recovery->max_id < id)
 		recovery->max_id = id;
 	return lsm;
@@ -1889,6 +1921,42 @@ vy_recovery_delete_slice(struct vy_recovery *recovery, int64_t slice_id)
 	return 0;
 }
 
+/**
+ * Mark all LSM trees created during rebootstrap as dropped so
+ * that they will be purged on the next garbage collection.
+ */
+static void
+vy_recovery_do_abort_rebootstrap(struct vy_recovery *recovery)
+{
+	struct vy_lsm_recovery_info *lsm;
+	rlist_foreach_entry(lsm, &recovery->lsms, in_recovery) {
+		if (lsm->in_rebootstrap) {
+			lsm->in_rebootstrap = false;
+			lsm->create_lsn = -1;
+			lsm->modify_lsn = -1;
+			lsm->drop_lsn = 0;
+		}
+	}
+}
+
+/** Handle a VY_LOG_REBOOTSTRAP log record. */
+static void
+vy_recovery_rebootstrap(struct vy_recovery *recovery)
+{
+	if (recovery->in_rebootstrap)
+		vy_recovery_do_abort_rebootstrap(recovery);
+	recovery->in_rebootstrap = true;
+}
+
+/** Handle VY_LOG_ABORT_REBOOTSTRAP record. */
+static void
+vy_recovery_abort_rebootstrap(struct vy_recovery *recovery)
+{
+	if (recovery->in_rebootstrap)
+		vy_recovery_do_abort_rebootstrap(recovery);
+	recovery->in_rebootstrap = false;
+}
+
 /**
  * Update a recovery context with a new log record.
  * Return 0 on success, -1 on failure.
@@ -1900,7 +1968,7 @@ static int
 vy_recovery_process_record(struct vy_recovery *recovery,
 			   const struct vy_log_record *record)
 {
-	int rc;
+	int rc = 0;
 	switch (record->type) {
 	case VY_LOG_PREPARE_LSM:
 		rc = vy_recovery_prepare_lsm(recovery, record->lsm_id,
@@ -1965,6 +2033,12 @@ vy_recovery_process_record(struct vy_recovery *recovery,
 		/* Not used anymore, ignore. */
 		rc = 0;
 		break;
+	case VY_LOG_REBOOTSTRAP:
+		vy_recovery_rebootstrap(recovery);
+		break;
+	case VY_LOG_ABORT_REBOOTSTRAP:
+		vy_recovery_abort_rebootstrap(recovery);
+		break;
 	default:
 		unreachable();
 	}
@@ -1974,6 +2048,26 @@ vy_recovery_process_record(struct vy_recovery *recovery,
 	return rc;
 }
 
+/**
+ * Commit the last rebootstrap attempt - drop all objects created
+ * before rebootstrap.
+ */
+static void
+vy_recovery_commit_rebootstrap(struct vy_recovery *recovery)
+{
+	assert(recovery->in_rebootstrap);
+	struct vy_lsm_recovery_info *lsm;
+	rlist_foreach_entry(lsm, &recovery->lsms, in_recovery) {
+		if (!lsm->in_rebootstrap && lsm->drop_lsn < 0) {
+			/*
+			 * The files will be removed when the current
+			 * checkpoint is purged by garbage collector.
+			 */
+			lsm->drop_lsn = vy_log_signature();
+		}
+	}
+}
+
 /**
  * Fill index_id_hash with LSM trees recovered from vylog.
  */
@@ -2065,6 +2159,7 @@ vy_recovery_new_f(va_list ap)
 	recovery->run_hash = NULL;
 	recovery->slice_hash = NULL;
 	recovery->max_id = -1;
+	recovery->in_rebootstrap = false;
 
 	recovery->index_id_hash = mh_i64ptr_new();
 	recovery->lsm_hash = mh_i64ptr_new();
@@ -2118,6 +2213,13 @@ vy_recovery_new_f(va_list ap)
 
 	xlog_cursor_close(&cursor, false);
 
+	if (recovery->in_rebootstrap) {
+		if ((flags & VY_RECOVERY_ABORT_REBOOTSTRAP) != 0)
+			vy_recovery_do_abort_rebootstrap(recovery);
+		else
+			vy_recovery_commit_rebootstrap(recovery);
+	}
+
 	if (vy_recovery_build_index_id_hash(recovery) != 0)
 		goto fail_free;
 out:
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index 98cbf6ee621dae54cc61fd3f8ffe4371a68239d9..7718d9c689d7a753c41c5dc36246d0d4a0301a18 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -196,6 +196,27 @@ enum vy_log_record_type {
 	 * a VY_LOG_CREATE_LSM record to commit it.
 	 */
 	VY_LOG_PREPARE_LSM		= 15,
+	/**
+	 * This record denotes the beginning of a rebootstrap section.
+	 * A rebootstrap section ends either by another record of this
+	 * type or by VY_LOG_ABORT_REBOOTSTRAP or at the end of the file.
+	 * All objects created between a VY_LOG_REBOOTSTRAP record and
+	 * VY_LOG_ABORT_REBOOTSTRAP or another VY_LOG_REBOOTSTRAP are
+	 * considered to be garbage and marked as dropped on recovery.
+	 *
+	 * We write a record of this type if a vylog file already exists
+	 * at bootstrap time, which means we are going to rebootstrap.
+	 * If rebootstrap succeeds, we rotate the vylog on checkpoint and
+	 * mark all objects written before the last VY_LOG_REBOOTSTRAP
+	 * record as dropped in the rotated vylog. If rebootstrap fails,
+	 * we write VY_LOG_ABORT_REBOOTSTRAP on recovery.
+	 */
+	VY_LOG_REBOOTSTRAP		= 16,
+	/**
+	 * This record is written on recovery if rebootstrap failed.
+	 * See also VY_LOG_REBOOTSTRAP.
+	 */
+	VY_LOG_ABORT_REBOOTSTRAP	= 17,
 
 	vy_log_record_type_MAX
 };
@@ -276,6 +297,12 @@ struct vy_recovery {
 	 * or -1 in case no vinyl objects were recovered.
 	 */
 	int64_t max_id;
+	/**
+	 * Set if we are currently processing a rebootstrap section,
+	 * i.e. we encountered a VY_LOG_REBOOTSTRAP record and haven't
+	 * seen matching VY_LOG_ABORT_REBOOTSTRAP.
+	 */
+	bool in_rebootstrap;
 };
 
 /** LSM tree info stored in a recovery context. */
@@ -326,6 +353,8 @@ struct vy_lsm_recovery_info {
 	 * this one after successful ALTER.
 	 */
 	struct vy_lsm_recovery_info *prepared;
+	/** Set if this LSM tree was created during rebootstrap. */
+	bool in_rebootstrap;
 };
 
 /** Vinyl range info stored in a recovery context. */
@@ -533,6 +562,11 @@ enum vy_recovery_flag {
 	 * of the last checkpoint.
 	 */
 	VY_RECOVERY_LOAD_CHECKPOINT	= 1 << 0,
+	/**
+	 * Consider the last attempt to rebootstrap aborted even if
+	 * there's no VY_LOG_ABORT_REBOOTSTRAP record.
+	 */
+	VY_RECOVERY_ABORT_REBOOTSTRAP	= 1 << 1,
 };
 
 /**
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index e5fdaed19b0bbc67ea200cece8d3e494eaed2a90..5c186b87b2e60161e6bd0f27329df5e4cbbf9df9 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -170,10 +170,6 @@ tx_manager_read_view(struct tx_manager *xm)
 		rv->vlsn = xm->lsn;
 		rv->refs = 1;
 	}
-	/*
-	 * Add to the tail of the list, so that tx_manager_vlsn()
-	 * works correctly.
-	 */
 	rlist_add_tail_entry(&xm->read_views, rv, in_read_views);
 	return rv;
 }
@@ -193,17 +189,6 @@ tx_manager_destroy_read_view(struct tx_manager *xm,
 	}
 }
 
-int64_t
-tx_manager_vlsn(struct tx_manager *xm)
-{
-	if (rlist_empty(&xm->read_views))
-		return xm->lsn;
-	struct vy_read_view *oldest = rlist_first_entry(&xm->read_views,
-							struct vy_read_view,
-							in_read_views);
-	return oldest->vlsn;
-}
-
 static struct txv *
 txv_new(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
 {
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index 9c35e2bd76eb91cc60a74c421eef2ea1ce6912bd..dcf6a73982d1911ee122d96d3609b97b9e668414 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -262,18 +262,6 @@ tx_manager_new(void);
 void
 tx_manager_delete(struct tx_manager *xm);
 
-/*
- * Determine the lowest possible vlsn, i.e. the level below
- * which the history could be compacted.
- *
- * If there are active read views, it is the first's vlsn.
- * If there is no active read view, a read view could be
- * created at any moment with vlsn = m->lsn, so m->lsn must
- * be chosen.
- */
-int64_t
-tx_manager_vlsn(struct tx_manager *xm);
-
 /** Initialize a tx object. */
 void
 vy_tx_create(struct tx_manager *xm, struct vy_tx *tx);
diff --git a/src/errinj.h b/src/errinj.h
index cde58d48515f8bc83ff20eb344755158ebce9506..64d13b0210afa286ec7c63bea51b1e1302499017 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -97,6 +97,7 @@ struct errinj {
 	_(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_RELAY_REPORT_INTERVAL, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_RELAY_FINAL_SLEEP, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_RELAY_FINAL_JOIN, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_PORT_DUMP, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_XLOG_GARBAGE, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \
diff --git a/src/lua/fio.lua b/src/lua/fio.lua
index 3fda6e5322929e9bfb6914b322cac185e31069de..d5531c1542c6ca1e69cc5dc5ac5b0838b7cab3ea 100644
--- a/src/lua/fio.lua
+++ b/src/lua/fio.lua
@@ -382,8 +382,12 @@ fio.rmtree = function(path)
     for i, f in ipairs(ls) do
         local tmppath = fio.pathjoin(path, f)
         local st = fio.stat(tmppath)
-        if st and st:is_dir() then
-            st, err = fio.rmtree(tmppath)
+        if st then
+            if st:is_dir() then
+                st, err = fio.rmtree(tmppath)
+            else
+                st, err = fio.unlink(tmppath)
+            end
             if err ~= nil  then
                 return nil, err
             end
diff --git a/test/app/fio.result b/test/app/fio.result
index 6c4609855f0dc2350a1d8ecdb7e6fb22b401c464..3f8d77e25f0471818e44f235a00a1e2ce22d9a54 100644
--- a/test/app/fio.result
+++ b/test/app/fio.result
@@ -413,6 +413,27 @@ fio.rmdir(dir2)
   - false
   - 'fio: No such file or directory'
 ...
+-- gh-3258 rmtree should remove directories with files
+fio.mktree('tmp2/tmp3/tmp4')
+---
+- true
+...
+fh = fio.open('tmp2/tmp3/tmp4/tmp.txt', {'O_RDWR', 'O_CREAT'})
+---
+...
+fh:close()
+---
+- true
+...
+fio.rmtree('tmp2')
+---
+- true
+...
+fio.stat('tmp2')
+---
+- null
+- 'fio: No such file or directory'
+...
 fio.rmdir(tmpdir)
 ---
 - true
diff --git a/test/app/fio.test.lua b/test/app/fio.test.lua
index 0850413d9b96cccf05d7d73e743ef9e1e836658a..c2c8e689b4a331236bfd4b97426e04b6583136cc 100644
--- a/test/app/fio.test.lua
+++ b/test/app/fio.test.lua
@@ -131,6 +131,14 @@ fio.rmdir(dir2)
 
 { fio.unlink(file1), fio.unlink(file2), fio.unlink(file3), fio.unlink(file4) }
 { fio.unlink(file1), fio.unlink(file2), fio.unlink(file3), fio.unlink(file4) }
+
+-- gh-3258 rmtree should remove directories with files
+fio.mktree('tmp2/tmp3/tmp4')
+fh = fio.open('tmp2/tmp3/tmp4/tmp.txt', {'O_RDWR', 'O_CREAT'})
+fh:close()
+fio.rmtree('tmp2')
+fio.stat('tmp2')
+
 fio.rmdir(tmpdir)
 fio.rmdir(tmpdir)
 
diff --git a/test/box/access.result b/test/box/access.result
index a1f3e996ac9cc1717e23937f534d521be57c3e9c..f4669a4a3cc37027e38c56d521dffd7cff8d81df 100644
--- a/test/box/access.result
+++ b/test/box/access.result
@@ -875,7 +875,12 @@ session = box.session
 box.schema.user.create('test')
 ---
 ...
-box.schema.user.grant('test', 'read,write,create', 'universe')
+box.schema.user.grant('test', 'read', 'space', '_collation')
+---
+...
+--box.schema.user.grant('test', 'write', 'space', '_collation')
+-- FIXME: granting create on 'collation' only doesn't work
+box.schema.user.grant('test', 'create', 'universe')
 ---
 ...
 session.su('test')
@@ -1389,7 +1394,10 @@ box.schema.func.create('test_func')
 box.session.su("admin")
 ---
 ...
-box.schema.user.grant("tester", "read", "universe")
+box.schema.user.grant("tester", "read", "space", "_user")
+---
+...
+box.schema.user.grant("tester", "read", "space", "_func")
 ---
 ...
 -- failed create
@@ -1416,7 +1424,20 @@ box.session.su("admin")
 -- explicitly since we still use process_rw to write to system
 -- tables from ddl
 --
-box.schema.user.grant("tester", "create,write", "universe")
+box.schema.user.grant('tester', 'write', 'universe')
+---
+...
+-- no entity user currently, so have to grant create
+-- on universe in order to create a user.
+box.schema.user.grant('tester', 'create', 'universe')
+---
+...
+-- this should work instead:
+--box.schema.user.grant('tester', 'create', 'user')
+--box.schema.user.grant('tester', 'create', 'space')
+--box.schema.user.grant('tester', 'create', 'function')
+--box.schema.user.grant('tester', 'create' , 'sequence')
+box.schema.user.grant('tester', 'read', 'space', '_sequence')
 ---
 ...
 box.session.su("tester")
@@ -1824,7 +1845,7 @@ _  = box.schema.sequence.create('test_sequence')
 box.session.su('admin')
 ---
 ...
-box.schema.user.grant('tester', 'create', 'universe')
+box.schema.user.grant('tester', 'create', 'sequence')
 ---
 ...
 box.session.su('tester')
diff --git a/test/box/access.test.lua b/test/box/access.test.lua
index fb8f744e8ae6708cbd97642c40f81cda26983dac..9ae0e1114837d6a0352f43f69b5b3f08ee9b4e82 100644
--- a/test/box/access.test.lua
+++ b/test/box/access.test.lua
@@ -340,7 +340,10 @@ c:close()
 
 session = box.session
 box.schema.user.create('test')
-box.schema.user.grant('test', 'read,write,create', 'universe')
+box.schema.user.grant('test', 'read', 'space', '_collation')
+--box.schema.user.grant('test', 'write', 'space', '_collation')
+-- FIXME: granting create on 'collation' only doesn't work
+box.schema.user.grant('test', 'create', 'universe')
 session.su('test')
 box.internal.collation.create('test', 'ICU', 'ru_RU')
 session.su('admin')
@@ -520,7 +523,8 @@ box.schema.space.create("test_space")
 box.schema.user.create('test_user')
 box.schema.func.create('test_func')
 box.session.su("admin")
-box.schema.user.grant("tester", "read", "universe")
+box.schema.user.grant("tester", "read", "space", "_user")
+box.schema.user.grant("tester", "read", "space", "_func")
 -- failed create
 box.session.su("tester")
 box.schema.space.create("test_space")
@@ -533,7 +537,16 @@ box.session.su("admin")
 -- explicitly since we still use process_rw to write to system
 -- tables from ddl
 --
-box.schema.user.grant("tester", "create,write", "universe")
+box.schema.user.grant('tester', 'write', 'universe')
+-- no entity user currently, so have to grant create
+-- on universe in order to create a user.
+box.schema.user.grant('tester', 'create', 'universe')
+-- this should work instead:
+--box.schema.user.grant('tester', 'create', 'user')
+--box.schema.user.grant('tester', 'create', 'space')
+--box.schema.user.grant('tester', 'create', 'function')
+--box.schema.user.grant('tester', 'create' , 'sequence')
+box.schema.user.grant('tester', 'read', 'space', '_sequence')
 box.session.su("tester")
 -- successful create
 s1 = box.schema.space.create("test_space")
@@ -712,7 +725,7 @@ box.schema.user.grant('tester', 'read,write', 'space', '_sequence')
 box.session.su('tester')
 _  = box.schema.sequence.create('test_sequence')
 box.session.su('admin')
-box.schema.user.grant('tester', 'create', 'universe')
+box.schema.user.grant('tester', 'create', 'sequence')
 box.session.su('tester')
 _ = box.schema.sequence.create('test_sequence')
 box.session.su('admin')
diff --git a/test/box/blackhole.result b/test/box/blackhole.result
new file mode 100644
index 0000000000000000000000000000000000000000..945b2755cd5a4fa8c93b6dd263dea25076b300b0
--- /dev/null
+++ b/test/box/blackhole.result
@@ -0,0 +1,229 @@
+test_run = require('test_run').new()
+---
+...
+s = box.schema.space.create('test', {engine = 'blackhole'})
+---
+...
+-- Blackhole doesn't support indexes.
+s:create_index('pk')
+---
+- error: Blackhole does not support indexes
+...
+-- Blackhole does support space format.
+s:format{{'key', 'unsigned'}, {'value', 'string'}}
+---
+...
+s:format()
+---
+- [{'name': 'key', 'type': 'unsigned'}, {'name': 'value', 'type': 'string'}]
+...
+t = s:insert{1, 'a'} -- ok
+---
+...
+t, t.key, t.value
+---
+- [1, 'a']
+- 1
+- a
+...
+s:insert{1, 2, 3} -- error
+---
+- error: 'Tuple field 2 type does not match one required by operation: expected string'
+...
+s:replace{'a', 'b', 'c'} -- error
+---
+- error: 'Tuple field 1 type does not match one required by operation: expected unsigned'
+...
+s:format{}
+---
+...
+s:insert{1, 2, 3} -- ok
+---
+- [1, 2, 3]
+...
+s:replace{'a', 'b', 'c'} -- ok
+---
+- ['a', 'b', 'c']
+...
+-- Blackhole doesn't support delete/update/upsert operations.
+box.internal.delete(s.id, 0, {})
+---
+- error: Blackhole does not support delete()
+...
+box.internal.update(s.id, 0, {}, {})
+---
+- error: Blackhole does not support update()
+...
+box.internal.upsert(s.id, {}, {})
+---
+- error: Blackhole does not support upsert()
+...
+-- Blackhole supports on_replace and before_replace triggers.
+s_old = nil
+---
+...
+s_new = nil
+---
+...
+f1 = s:on_replace(function(old, new) s_old = old s_new = new end)
+---
+...
+s:replace{1, 2, 3}
+---
+- [1, 2, 3]
+...
+s_old, s_new
+---
+- null
+- [1, 2, 3]
+...
+f2 = s:before_replace(function(old, new) return box.tuple.new{4, 5, 6} end)
+---
+...
+s:replace{1, 2, 3}
+---
+- [4, 5, 6]
+...
+s_old, s_new
+---
+- null
+- [4, 5, 6]
+...
+s:on_replace(nil, f1)
+---
+...
+s:before_replace(nil, f2)
+---
+...
+-- Blackhole statements can be mixed in other engines' transactions.
+memtx = box.schema.space.create('memtx', {engine = 'memtx'})
+---
+...
+_ = memtx:create_index('pk')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+s:replace{1}
+memtx:replace{1}
+s:replace{2}
+memtx:replace{2}
+box.commit();
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+memtx:select()
+---
+- - [1]
+  - [2]
+...
+f = s:on_replace(function(old, new) memtx:replace(new) end)
+---
+...
+s:replace{3}
+---
+- [3]
+...
+s:replace{4}
+---
+- [4]
+...
+memtx:select()
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+...
+s:on_replace(nil, f)
+---
+...
+memtx:drop()
+---
+...
+-- Test recovery.
+test_run:cmd('restart server default')
+s = box.space.test
+---
+...
+-- Test snapshot.
+box.snapshot()
+---
+- ok
+...
+-- Operations done on a blackhole space are written to the WAL
+-- and therefore get replicated. Check it with the aid of an
+-- on_replace trigger.
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+t = {}
+---
+...
+_ = box.space.test:on_replace(function(old, new) table.insert(t, new) end)
+---
+...
+test_run:cmd('switch default')
+---
+- true
+...
+s = box.space.test
+---
+...
+for i = 1, 5 do s:replace{i} end
+---
+...
+vclock = test_run:get_vclock('default')
+---
+...
+test_run:wait_vclock('replica', vclock)
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+t
+---
+- - [1]
+  - [2]
+  - [3]
+  - [4]
+  - [5]
+...
+test_run:cmd('switch default')
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/box/blackhole.test.lua b/test/box/blackhole.test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..bcf05f3a648659c0ae9ecaab9469ded807693860
--- /dev/null
+++ b/test/box/blackhole.test.lua
@@ -0,0 +1,83 @@
+test_run = require('test_run').new()
+
+s = box.schema.space.create('test', {engine = 'blackhole'})
+
+-- Blackhole doesn't support indexes.
+s:create_index('pk')
+
+-- Blackhole does support space format.
+s:format{{'key', 'unsigned'}, {'value', 'string'}}
+s:format()
+t = s:insert{1, 'a'} -- ok
+t, t.key, t.value
+s:insert{1, 2, 3} -- error
+s:replace{'a', 'b', 'c'} -- error
+s:format{}
+s:insert{1, 2, 3} -- ok
+s:replace{'a', 'b', 'c'} -- ok
+
+-- Blackhole doesn't support delete/update/upsert operations.
+box.internal.delete(s.id, 0, {})
+box.internal.update(s.id, 0, {}, {})
+box.internal.upsert(s.id, {}, {})
+
+-- Blackhole supports on_replace and before_replace triggers.
+s_old = nil
+s_new = nil
+f1 = s:on_replace(function(old, new) s_old = old s_new = new end)
+s:replace{1, 2, 3}
+s_old, s_new
+f2 = s:before_replace(function(old, new) return box.tuple.new{4, 5, 6} end)
+s:replace{1, 2, 3}
+s_old, s_new
+s:on_replace(nil, f1)
+s:before_replace(nil, f2)
+
+-- Blackhole statements can be mixed in other engines' transactions.
+memtx = box.schema.space.create('memtx', {engine = 'memtx'})
+_ = memtx:create_index('pk')
+test_run:cmd("setopt delimiter ';'")
+box.begin()
+s:replace{1}
+memtx:replace{1}
+s:replace{2}
+memtx:replace{2}
+box.commit();
+test_run:cmd("setopt delimiter ''");
+memtx:select()
+f = s:on_replace(function(old, new) memtx:replace(new) end)
+s:replace{3}
+s:replace{4}
+memtx:select()
+s:on_replace(nil, f)
+memtx:drop()
+
+-- Test recovery.
+test_run:cmd('restart server default')
+s = box.space.test
+
+-- Test snapshot.
+box.snapshot()
+
+-- Operations done on a blackhole space are written to the WAL
+-- and therefore get replicated. Check it with the aid of an
+-- on_replace trigger.
+box.schema.user.grant('guest', 'replication')
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+t = {}
+_ = box.space.test:on_replace(function(old, new) table.insert(t, new) end)
+test_run:cmd('switch default')
+s = box.space.test
+for i = 1, 5 do s:replace{i} end
+vclock = test_run:get_vclock('default')
+test_run:wait_vclock('replica', vclock)
+test_run:cmd("switch replica")
+t
+test_run:cmd('switch default')
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+box.schema.user.revoke('guest', 'replication')
+
+s:drop()
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 54b6d578f48826c392cb14e9bf509826d2294639..c6b2bbac2a72278bb934246289628e176e1237de 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -60,13 +60,15 @@ errinj.info()
     state: false
   ERRINJ_WAL_WRITE_DISK:
     state: false
+  ERRINJ_VY_LOG_FILE_RENAME:
+    state: false
   ERRINJ_VY_RUN_WRITE:
     state: false
-  ERRINJ_VY_LOG_FILE_RENAME:
+  ERRINJ_HTTP_RESPONSE_ADD_WAIT:
     state: false
   ERRINJ_VY_LOG_FLUSH_DELAY:
     state: false
-  ERRINJ_HTTP_RESPONSE_ADD_WAIT:
+  ERRINJ_RELAY_FINAL_JOIN:
     state: false
   ERRINJ_SNAP_COMMIT_DELAY:
     state: false
diff --git a/test/box/on_replace.result b/test/box/on_replace.result
index fcdb43794636a2a329e312d3aebf0a6adf27ac54..f2de06f9095d7451fe3fd2b7ad4939769c34b486 100644
--- a/test/box/on_replace.result
+++ b/test/box/on_replace.result
@@ -471,21 +471,21 @@ t = s:on_replace(function () s:create_index('sec') end, t)
 ...
 s:replace({2, 3})
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: DDL does not support multi-statement transactions
 ...
 t = s:on_replace(function () box.schema.user.create('newu') end, t)
 ---
 ...
 s:replace({3, 4})
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: Space _user does not support multi-statement transactions
 ...
 t = s:on_replace(function () box.schema.role.create('newr') end, t)
 ---
 ...
 s:replace({4, 5})
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: Space _user does not support multi-statement transactions
 ...
 t = s:on_replace(function () box.space._user:delete{box.schema.GUEST_ID} end, t)
 ---
@@ -506,21 +506,21 @@ t = s:on_replace(function () s:drop() end, t)
 ...
 s:replace({5, 6})
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: DDL does not support multi-statement transactions
 ...
 t = s:on_replace(function () box.schema.func.create('newf') end, t)
 ---
 ...
 s:replace({6, 7})
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: Space _func does not support multi-statement transactions
 ...
 t = s:on_replace(function () box.schema.user.grant('guest', 'read,write', 'space', 'test_on_repl_ddl') end, t)
 ---
 ...
 s:replace({7, 8})
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: Space _priv does not support multi-statement transactions
 ...
 t = s:on_replace(function () s:rename('newname') end, t)
 ---
diff --git a/test/box/sequence.result b/test/box/sequence.result
index cbbd45080235f5cb86619f42e7c383df2a6d9cc9..a2a1a60ea487f5d6a5b2c3b5c10faa86c693624d 100644
--- a/test/box/sequence.result
+++ b/test/box/sequence.result
@@ -1471,8 +1471,17 @@ box.session.su('admin')
 sq:drop()
 ---
 ...
+box.schema.user.revoke('user', 'read,write', 'universe')
+---
+...
 -- A user can alter/use sequences that he owns.
-box.schema.user.grant('user', 'create', 'universe')
+box.schema.user.grant('user', 'create', 'sequence')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_sequence')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_sequence_data')
 ---
 ...
 box.session.su('user')
@@ -1493,7 +1502,13 @@ sq = box.schema.sequence.create('seq')
 box.session.su('admin')
 ---
 ...
-box.schema.user.revoke('user', 'read,write,create', 'universe')
+box.schema.user.revoke('user', 'create', 'sequence')
+---
+...
+box.schema.user.revoke('user', 'write', 'space', '_sequence')
+---
+...
+box.schema.user.revoke('user', 'write', 'space', '_sequence_data')
 ---
 ...
 box.session.su('user')
@@ -1515,7 +1530,8 @@ box.session.su('admin')
 sq:drop()
 ---
 ...
--- A sequence can be attached to a space only if the user owns both.
+-- A sequence can be attached to a space only if the user has
+-- create privilege on space and read/write on sequence.
 sq1 = box.schema.sequence.create('seq1')
 ---
 ...
@@ -1525,10 +1541,22 @@ s1 = box.schema.space.create('space1')
 _ = s1:create_index('pk')
 ---
 ...
-box.schema.user.grant('user', 'read,write', 'universe')
+box.schema.user.grant('user', 'write', 'space', '_sequence')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_sequence_data')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_schema')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_space')
 ---
 ...
-box.schema.user.grant('user', 'create', 'universe')
+box.schema.user.grant('user', 'create', 'space')
+---
+...
+box.schema.user.grant('user', 'create', 'sequence')
 ---
 ...
 box.session.su('user')
@@ -1540,17 +1568,50 @@ sq2 = box.schema.sequence.create('seq2')
 s2 = box.schema.space.create('space2')
 ---
 ...
--- fixme: no error on using another user's sequence
-_ = s2:create_index('pk', {sequence = 'seq1'})
+box.session.su('admin')
+---
+...
+box.schema.user.revoke('user', 'create', 'sequence')
+---
+...
+box.schema.user.revoke('user', 'write', 'space', '_sequence')
+---
+...
+box.schema.user.revoke('user', 'write', 'space', '_sequence_data')
 ---
 ...
+box.schema.user.revoke('user', 'write', 'space', '_schema')
+---
+...
+box.schema.user.revoke('user', 'write', 'space', '_space')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_index')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_space_sequence')
+---
+...
+box.schema.user.grant('user', 'read', 'space', '_index')
+---
+...
+box.schema.user.grant('user', 'read', 'space', '_space_sequence')
+---
+...
+box.session.su('user')
+---
+...
+_ = s2:create_index('pk', {sequence = 'seq1'}) -- error
+---
+- error: Read access to sequence 'seq1' is denied for user 'user'
+...
 s1.index.pk:alter({sequence = 'seq1'}) -- error
 ---
 - error: Alter access to space 'space1' is denied for user 'user'
 ...
 box.space._space_sequence:replace{s1.id, sq1.id, false} -- error
 ---
-- error: Alter access to space 'space1' is denied for user 'user'
+- error: Read access to sequence 'seq1' is denied for user 'user'
 ...
 box.space._space_sequence:replace{s1.id, sq2.id, false} -- error
 ---
@@ -1558,7 +1619,7 @@ box.space._space_sequence:replace{s1.id, sq2.id, false} -- error
 ...
 box.space._space_sequence:replace{s2.id, sq1.id, false} -- error
 ---
-- error: Alter access to sequence 'seq1' is denied for user 'user'
+- error: Read access to sequence 'seq1' is denied for user 'user'
 ...
 s2.index.pk:alter({sequence = 'seq2'}) -- ok
 ---
@@ -1569,10 +1630,22 @@ box.session.su('admin')
 -- If the user owns a sequence attached to a space,
 -- it can use it for auto increment, otherwise it
 -- needs privileges.
-box.schema.user.revoke('user', 'read,write', 'universe')
+box.schema.user.revoke('user', 'write', 'space', '_index')
+---
+...
+box.schema.user.revoke('user', 'write', 'space', '_space_sequence')
+---
+...
+box.schema.user.revoke('user', 'read', 'space', '_space')
+---
+...
+box.schema.user.revoke('user', 'read', 'space', '_sequence')
+---
+...
+box.schema.user.revoke('user', 'read', 'space', '_index')
 ---
 ...
-box.schema.user.revoke('user', 'create', 'universe')
+box.schema.user.revoke('user', 'read', 'space', '_space_sequence')
 ---
 ...
 box.session.su('user')
@@ -1680,7 +1753,16 @@ s:drop()
 ---
 ...
 -- When a user is dropped, all his sequences are dropped as well.
-box.schema.user.grant('user', 'read,write,create', 'universe')
+box.schema.user.grant('user', 'write', 'space', '_sequence')
+---
+...
+box.schema.user.grant('user', 'read', 'space', '_sequence')
+---
+...
+box.schema.user.grant('user', 'write', 'space', '_space_sequence')
+---
+...
+box.schema.user.grant('user', 'create', 'sequence')
 ---
 ...
 box.session.su('user')
@@ -1710,10 +1792,25 @@ box.schema.user.create('user1')
 box.schema.user.create('user2')
 ---
 ...
-box.schema.user.grant('user1', 'read,write,create', 'universe')
+box.schema.user.grant('user1', 'create', 'sequence')
+---
+...
+box.schema.user.grant('user1', 'write', 'space', '_sequence')
+---
+...
+box.schema.user.grant('user1', 'read', 'space', '_sequence')
+---
+...
+box.schema.user.grant('user1', 'read', 'space', '_user')
+---
+...
+box.schema.user.grant('user1', 'write', 'space', '_sequence_data')
+---
+...
+box.schema.user.grant('user1', 'write', 'space', '_priv')
 ---
 ...
-box.schema.user.grant('user2', 'read,write,create', 'universe')
+box.schema.user.grant('user2', 'read,write', 'universe')
 ---
 ...
 box.session.su('user1')
diff --git a/test/box/sequence.test.lua b/test/box/sequence.test.lua
index c119459b31beaa09ee049c36f7de775761d9c531..96297d6f27ea9257f6dab67ee1a7286eafa0445f 100644
--- a/test/box/sequence.test.lua
+++ b/test/box/sequence.test.lua
@@ -488,16 +488,21 @@ sq:alter{step = 2} -- error
 sq:drop() -- error
 box.session.su('admin')
 sq:drop()
+box.schema.user.revoke('user', 'read,write', 'universe')
 
 -- A user can alter/use sequences that he owns.
-box.schema.user.grant('user', 'create', 'universe')
+box.schema.user.grant('user', 'create', 'sequence')
+box.schema.user.grant('user', 'write', 'space', '_sequence')
+box.schema.user.grant('user', 'write', 'space', '_sequence_data')
 box.session.su('user')
 sq = box.schema.sequence.create('seq')
 sq:alter{step = 2} -- ok
 sq:drop() -- ok
 sq = box.schema.sequence.create('seq')
 box.session.su('admin')
-box.schema.user.revoke('user', 'read,write,create', 'universe')
+box.schema.user.revoke('user', 'create', 'sequence')
+box.schema.user.revoke('user', 'write', 'space', '_sequence')
+box.schema.user.revoke('user', 'write', 'space', '_sequence_data')
 box.session.su('user')
 sq:set(100) -- ok - user owns the sequence
 sq:next() -- ok
@@ -505,17 +510,33 @@ sq:reset() -- ok
 box.session.su('admin')
 sq:drop()
 
--- A sequence can be attached to a space only if the user owns both.
+-- A sequence can be attached to a space only if the user has
+-- create privilege on space and read/write on sequence.
 sq1 = box.schema.sequence.create('seq1')
 s1 = box.schema.space.create('space1')
 _ = s1:create_index('pk')
-box.schema.user.grant('user', 'read,write', 'universe')
-box.schema.user.grant('user', 'create', 'universe')
+box.schema.user.grant('user', 'write', 'space', '_sequence')
+box.schema.user.grant('user', 'write', 'space', '_sequence_data')
+box.schema.user.grant('user', 'write', 'space', '_schema')
+box.schema.user.grant('user', 'write', 'space', '_space')
+box.schema.user.grant('user', 'create', 'space')
+box.schema.user.grant('user', 'create', 'sequence')
 box.session.su('user')
 sq2 = box.schema.sequence.create('seq2')
 s2 = box.schema.space.create('space2')
--- fixme: no error on using another user's sequence
-_ = s2:create_index('pk', {sequence = 'seq1'})
+
+box.session.su('admin')
+box.schema.user.revoke('user', 'create', 'sequence')
+box.schema.user.revoke('user', 'write', 'space', '_sequence')
+box.schema.user.revoke('user', 'write', 'space', '_sequence_data')
+box.schema.user.revoke('user', 'write', 'space', '_schema')
+box.schema.user.revoke('user', 'write', 'space', '_space')
+box.schema.user.grant('user', 'write', 'space', '_index')
+box.schema.user.grant('user', 'write', 'space', '_space_sequence')
+box.schema.user.grant('user', 'read', 'space', '_index')
+box.schema.user.grant('user', 'read', 'space', '_space_sequence')
+box.session.su('user')
+_ = s2:create_index('pk', {sequence = 'seq1'}) -- error
 s1.index.pk:alter({sequence = 'seq1'}) -- error
 box.space._space_sequence:replace{s1.id, sq1.id, false} -- error
 box.space._space_sequence:replace{s1.id, sq2.id, false} -- error
@@ -526,8 +547,12 @@ box.session.su('admin')
 -- If the user owns a sequence attached to a space,
 -- it can use it for auto increment, otherwise it
 -- needs privileges.
-box.schema.user.revoke('user', 'read,write', 'universe')
-box.schema.user.revoke('user', 'create', 'universe')
+box.schema.user.revoke('user', 'write', 'space', '_index')
+box.schema.user.revoke('user', 'write', 'space', '_space_sequence')
+box.schema.user.revoke('user', 'read', 'space', '_space')
+box.schema.user.revoke('user', 'read', 'space', '_sequence')
+box.schema.user.revoke('user', 'read', 'space', '_index')
+box.schema.user.revoke('user', 'read', 'space', '_space_sequence')
 box.session.su('user')
 s2:insert{nil, 1} -- ok: {1, 1}
 box.session.su('admin')
@@ -563,7 +588,10 @@ box.session.su('admin')
 s:drop()
 
 -- When a user is dropped, all his sequences are dropped as well.
-box.schema.user.grant('user', 'read,write,create', 'universe')
+box.schema.user.grant('user', 'write', 'space', '_sequence')
+box.schema.user.grant('user', 'read', 'space', '_sequence')
+box.schema.user.grant('user', 'write', 'space', '_space_sequence')
+box.schema.user.grant('user', 'create', 'sequence')
 box.session.su('user')
 _ = box.schema.sequence.create('test1')
 _ = box.schema.sequence.create('test2')
@@ -575,8 +603,13 @@ box.sequence
 -- to a sequence.
 box.schema.user.create('user1')
 box.schema.user.create('user2')
-box.schema.user.grant('user1', 'read,write,create', 'universe')
-box.schema.user.grant('user2', 'read,write,create', 'universe')
+box.schema.user.grant('user1', 'create', 'sequence')
+box.schema.user.grant('user1', 'write', 'space', '_sequence')
+box.schema.user.grant('user1', 'read', 'space', '_sequence')
+box.schema.user.grant('user1', 'read', 'space', '_user')
+box.schema.user.grant('user1', 'write', 'space', '_sequence_data')
+box.schema.user.grant('user1', 'write', 'space', '_priv')
+box.schema.user.grant('user2', 'read,write', 'universe')
 box.session.su('user1')
 sq = box.schema.sequence.create('test')
 box.session.su('user2')
diff --git a/test/box/transaction.result b/test/box/transaction.result
index 841dcf77eb9f440c4dc4b8ee162277f2f1b09288..e0240842cec3917a846495db5e358961638ea1c7 100644
--- a/test/box/transaction.result
+++ b/test/box/transaction.result
@@ -69,21 +69,21 @@ box.rollback();
 ...
 box.begin() box.schema.func.create('test');
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: Space _func does not support multi-statement transactions
 ...
 box.rollback();
 ---
 ...
 box.begin() box.schema.user.create('test');
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: Space _user does not support multi-statement transactions
 ...
 box.rollback();
 ---
 ...
 box.begin() box.schema.user.grant('guest', 'read', 'space', '_priv');
 ---
-- error: A multi-statement transaction can not use multiple storage engines
+- error: Space _priv does not support multi-statement transactions
 ...
 box.rollback();
 ---
@@ -498,3 +498,34 @@ space:select{}
 space:drop()
 ---
 ...
+--
+-- gh-3528: sysview engine shouldn't participate in transaction control
+--
+space = box.schema.space.create('test', {id = 9000})
+---
+...
+index = space:create_index('primary')
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+space:auto_increment{box.space._vspace:get{space.id}}
+space:auto_increment{box.space._vindex:get{space.id, index.id}}
+box.commit();
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+space:select()
+---
+- - [1, [9000, 1, 'test', 'memtx', 0, {}, []]]
+  - [2, [9000, 0, 'primary', 'tree', {'unique': true}, [[0, 'unsigned']]]]
+...
+space:drop()
+---
+...
diff --git a/test/box/transaction.test.lua b/test/box/transaction.test.lua
index 14d1a690d88c77aa252339b3bfd4b81aa1bd8328..e1d258e6c837a50ab4151f00463388969908004f 100644
--- a/test/box/transaction.test.lua
+++ b/test/box/transaction.test.lua
@@ -232,3 +232,19 @@ test_run:cmd("setopt delimiter ''");
 space:select{}
 
 space:drop()
+
+--
+-- gh-3528: sysview engine shouldn't participate in transaction control
+--
+space = box.schema.space.create('test', {id = 9000})
+index = space:create_index('primary')
+
+test_run:cmd("setopt delimiter ';'")
+box.begin()
+space:auto_increment{box.space._vspace:get{space.id}}
+space:auto_increment{box.space._vindex:get{space.id, index.id}}
+box.commit();
+test_run:cmd("setopt delimiter ''");
+
+space:select()
+space:drop()
diff --git a/test/replication/gc.result b/test/replication/gc.result
index e5c5cfccdc9a83d0d56289d4ff97dfc2c43ed917..3f9db26ce07208032382261a6904cd038e00ed02 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -369,6 +369,89 @@ replica_set.wait_all(test_run)
 replica_set.drop_all(test_run)
 ---
 ...
+--
+-- Check that once a replica is removed from the cluster table,
+-- all xlogs kept for it are removed even if it is configured as
+-- a replication master (gh-3546).
+--
+fio = require('fio')
+---
+...
+fiber = require('fiber')
+---
+...
+-- Start a replica and set it up as a master for this instance.
+test_run:cmd("start server replica")
+---
+- true
+...
+replica_port = test_run:eval('replica', 'return box.cfg.listen')[1]
+---
+...
+replica_port ~= nil
+---
+- true
+...
+box.cfg{replication = replica_port}
+---
+...
+-- Stop the replica and write a few WALs.
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+_ = s:auto_increment{}
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = s:auto_increment{}
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = s:auto_increment{}
+---
+...
+box.snapshot()
+---
+- ok
+...
+#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
+---
+- true
+...
+-- Delete the replica from the cluster table and check that
+-- all xlog files are removed.
+test_run:cleanup_cluster()
+---
+...
+box.snapshot()
+---
+- ok
+...
+t = fiber.time()
+---
+...
+while #fio.glob('./master/*xlog') > 0 and fiber.time() - t < 10 do fiber.sleep(0.01) end
+---
+...
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+---
+- true
+...
+-- Restore the config.
+box.cfg{replication = {}}
+---
+...
 -- Cleanup.
 s:drop()
 ---
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index a465140c8316bc2c25c93cb212c87790a77259a2..96f11f8d46466095b52ed345511b42604aaed82b 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -172,6 +172,42 @@ replica_set.start_all(test_run)
 replica_set.wait_all(test_run)
 replica_set.drop_all(test_run)
 
+--
+-- Check that once a replica is removed from the cluster table,
+-- all xlogs kept for it are removed even if it is configured as
+-- a replication master (gh-3546).
+--
+fio = require('fio')
+fiber = require('fiber')
+
+-- Start a replica and set it up as a master for this instance.
+test_run:cmd("start server replica")
+replica_port = test_run:eval('replica', 'return box.cfg.listen')[1]
+replica_port ~= nil
+box.cfg{replication = replica_port}
+
+-- Stop the replica and write a few WALs.
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+_ = s:auto_increment{}
+box.snapshot()
+_ = s:auto_increment{}
+box.snapshot()
+_ = s:auto_increment{}
+box.snapshot()
+#fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
+
+-- Delete the replica from the cluster table and check that
+-- all xlog files are removed.
+test_run:cleanup_cluster()
+box.snapshot()
+t = fiber.time()
+while #fio.glob('./master/*xlog') > 0 and fiber.time() - t < 10 do fiber.sleep(0.01) end
+#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master')
+
+-- Restore the config.
+box.cfg{replication = {}}
+
 -- Cleanup.
 s:drop()
 box.error.injection.set("ERRINJ_RELAY_REPORT_INTERVAL", 0)
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
new file mode 100644
index 0000000000000000000000000000000000000000..4370fae4ba97f7e1029d1b3170631b912c1aa226
--- /dev/null
+++ b/test/replication/replica_rejoin.result
@@ -0,0 +1,250 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+engine = test_run:get_cfg('engine')
+---
+...
+test_run:cleanup_cluster()
+---
+...
+--
+-- gh-461: check that a replica refetches the last checkpoint
+-- in case it fell behind the master.
+--
+box.schema.user.grant('guest', 'replication')
+---
+...
+_ = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+_ = box.space.test:insert{1}
+---
+...
+_ = box.space.test:insert{2}
+---
+...
+_ = box.space.test:insert{3}
+---
+...
+-- Join a replica, then stop it.
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.replication[1].upstream.status == 'follow' or box.info
+---
+- true
+...
+box.space.test:select()
+---
+- - [1]
+  - [2]
+  - [3]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+-- Restart the server to purge the replica from
+-- the garbage collection state.
+test_run:cmd("restart server default")
+-- Make some checkpoints to remove old xlogs.
+checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+_ = box.space.test:delete{1}
+---
+...
+_ = box.space.test:insert{10}
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = box.space.test:delete{2}
+---
+...
+_ = box.space.test:insert{20}
+---
+...
+box.snapshot()
+---
+- ok
+...
+_ = box.space.test:delete{3}
+---
+...
+_ = box.space.test:insert{30}
+---
+...
+fio = require('fio')
+---
+...
+#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
+---
+- 1
+...
+box.cfg{checkpoint_count = checkpoint_count}
+---
+...
+-- Restart the replica. Since xlogs have been removed,
+-- it is supposed to rejoin without changing id.
+test_run:cmd("start server replica")
+---
+- true
+...
+box.info.replication[2].downstream.vclock ~= nil or box.info
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.replication[1].upstream.status == 'follow' or box.info
+---
+- true
+...
+box.space.test:select()
+---
+- - [10]
+  - [20]
+  - [30]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- Make sure the replica follows new changes.
+for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
+---
+...
+vclock = test_run:get_vclock('default')
+---
+...
+_ = test_run:wait_vclock('replica', vclock)
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [10, 10]
+  - [20, 20]
+  - [30, 30]
+...
+-- Check that restart works as usual.
+test_run:cmd("restart server replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+---
+- true
+...
+box.space.test:select()
+---
+- - [10, 10]
+  - [20, 20]
+  - [30, 30]
+...
+-- Check that rebootstrap is NOT initiated unless the replica
+-- is strictly behind the master.
+box.space.test:replace{1, 2, 3} -- bumps LSN on the replica
+---
+- [1, 2, 3]
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("restart server default")
+checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+for i = 1, 3 do box.space.test:delete{i * 10} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+for i = 1, 3 do box.space.test:insert{i * 100} end
+---
+...
+fio = require('fio')
+---
+...
+#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
+---
+- 1
+...
+box.cfg{checkpoint_count = checkpoint_count}
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.status -- orphan
+---
+- orphan
+...
+box.space.test:select()
+---
+- - [1, 2, 3]
+  - [10, 10]
+  - [20, 20]
+  - [30, 30]
+...
+-- Cleanup.
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..f998f60d03de2f81f8af4d0635fea8c0f43f59a2
--- /dev/null
+++ b/test/replication/replica_rejoin.test.lua
@@ -0,0 +1,91 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+
+test_run:cleanup_cluster()
+
+--
+-- gh-461: check that a replica refetches the last checkpoint
+-- in case it fell behind the master.
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test', {engine = engine})
+_ = box.space.test:create_index('pk')
+_ = box.space.test:insert{1}
+_ = box.space.test:insert{2}
+_ = box.space.test:insert{3}
+
+-- Join a replica, then stop it.
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+box.space.test:select()
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+
+-- Restart the server to purge the replica from
+-- the garbage collection state.
+test_run:cmd("restart server default")
+
+-- Make some checkpoints to remove old xlogs.
+checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 1}
+_ = box.space.test:delete{1}
+_ = box.space.test:insert{10}
+box.snapshot()
+_ = box.space.test:delete{2}
+_ = box.space.test:insert{20}
+box.snapshot()
+_ = box.space.test:delete{3}
+_ = box.space.test:insert{30}
+fio = require('fio')
+#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
+box.cfg{checkpoint_count = checkpoint_count}
+
+-- Restart the replica. Since xlogs have been removed,
+-- it is supposed to rejoin without changing id.
+test_run:cmd("start server replica")
+box.info.replication[2].downstream.vclock ~= nil or box.info
+test_run:cmd("switch replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+box.space.test:select()
+test_run:cmd("switch default")
+
+-- Make sure the replica follows new changes.
+for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
+vclock = test_run:get_vclock('default')
+_ = test_run:wait_vclock('replica', vclock)
+test_run:cmd("switch replica")
+box.space.test:select()
+
+-- Check that restart works as usual.
+test_run:cmd("restart server replica")
+box.info.replication[1].upstream.status == 'follow' or box.info
+box.space.test:select()
+
+-- Check that rebootstrap is NOT initiated unless the replica
+-- is strictly behind the master.
+box.space.test:replace{1, 2, 3} -- bumps LSN on the replica
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("restart server default")
+checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 1}
+for i = 1, 3 do box.space.test:delete{i * 10} end
+box.snapshot()
+for i = 1, 3 do box.space.test:insert{i * 100} end
+fio = require('fio')
+#fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
+box.cfg{checkpoint_count = checkpoint_count}
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+box.info.status -- orphan
+box.space.test:select()
+
+-- Cleanup.
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/vinyl/replica_rejoin.lua b/test/vinyl/replica_rejoin.lua
new file mode 100644
index 0000000000000000000000000000000000000000..7cb7e09a4d0f75b34773a9e7ab68705a728964af
--- /dev/null
+++ b/test/vinyl/replica_rejoin.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+local replication = os.getenv("MASTER")
+if arg[1] == 'disable_replication' then
+    replication = nil
+end
+
+box.cfg({
+    replication     = replication,
+    vinyl_memory    = 1024 * 1024,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/vinyl/replica_rejoin.result b/test/vinyl/replica_rejoin.result
new file mode 100644
index 0000000000000000000000000000000000000000..bd5d1ed38048f5a64872a09a0cecd557abf949a5
--- /dev/null
+++ b/test/vinyl/replica_rejoin.result
@@ -0,0 +1,257 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+--
+-- gh-461: check that garbage collection works as expected
+-- after rebootstrap.
+--
+box.schema.user.grant('guest', 'replication')
+---
+...
+_ = box.schema.space.create('test', { id = 9000, engine = 'vinyl' })
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+pad = string.rep('x', 15 * 1024)
+---
+...
+for i = 1, 100 do box.space.test:replace{i, pad} end
+---
+...
+box.snapshot()
+---
+- ok
+...
+-- Join a replica. Check its files.
+test_run:cmd("create server replica with rpl_master=default, script='vinyl/replica_rejoin.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+fio = require('fio')
+---
+...
+fio.chdir(box.cfg.vinyl_dir)
+---
+- true
+...
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+---
+- - 9000/0/00000000000000000002.index
+  - 9000/0/00000000000000000002.run
+  - 9000/0/00000000000000000004.index
+  - 9000/0/00000000000000000004.run
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+-- Invoke garbage collector on the master.
+test_run:cmd("restart server default")
+checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+box.space.test:delete(1)
+---
+...
+box.snapshot()
+---
+- ok
+...
+box.cfg{checkpoint_count = checkpoint_count}
+---
+...
+-- Rebootstrap the replica. Check that old files are removed
+-- by garbage collector.
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+box.snapshot()
+---
+- ok
+...
+fio = require('fio')
+---
+...
+fio.chdir(box.cfg.vinyl_dir)
+---
+- true
+...
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+---
+- - 9000/0/00000000000000000008.index
+  - 9000/0/00000000000000000008.run
+  - 9000/0/00000000000000000010.index
+  - 9000/0/00000000000000000010.run
+...
+box.space.test:count() -- 99
+---
+- 99
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+-- Invoke garbage collector on the master.
+test_run:cmd("restart server default")
+checkpoint_count = box.cfg.checkpoint_count
+---
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+box.space.test:delete(2)
+---
+...
+box.snapshot()
+---
+- ok
+...
+box.cfg{checkpoint_count = checkpoint_count}
+---
+...
+-- Make the master fail join after sending data. Check that
+-- files written during failed rebootstrap attempt are removed
+-- by garbage collector.
+box.error.injection.set('ERRINJ_RELAY_FINAL_JOIN', true)
+---
+- ok
+...
+test_run:cmd("start server replica with crash_expected=True") -- fail
+---
+- false
+...
+test_run:cmd("start server replica with crash_expected=True") -- fail again
+---
+- false
+...
+test_run:cmd("start server replica with args='disable_replication'")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+box.snapshot()
+---
+- ok
+...
+fio = require('fio')
+---
+...
+fio.chdir(box.cfg.vinyl_dir)
+---
+- true
+...
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+---
+- - 9000/0/00000000000000000008.index
+  - 9000/0/00000000000000000008.run
+  - 9000/0/00000000000000000010.index
+  - 9000/0/00000000000000000010.run
+...
+box.space.test:count() -- 99
+---
+- 99
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+box.error.injection.set('ERRINJ_RELAY_FINAL_JOIN', false)
+---
+- ok
+...
+-- Rebootstrap after several failed attempts and make sure
+-- old files are removed.
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{checkpoint_count = 1}
+---
+...
+box.snapshot()
+---
+- ok
+...
+fio = require('fio')
+---
+...
+fio.chdir(box.cfg.vinyl_dir)
+---
+- true
+...
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+---
+- - 9000/0/00000000000000000022.index
+  - 9000/0/00000000000000000022.run
+  - 9000/0/00000000000000000024.index
+  - 9000/0/00000000000000000024.run
+...
+box.space.test:count() -- 98
+---
+- 98
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+-- Cleanup.
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
diff --git a/test/vinyl/replica_rejoin.test.lua b/test/vinyl/replica_rejoin.test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..972b04e5bf06fa387db44262d493b7755030e421
--- /dev/null
+++ b/test/vinyl/replica_rejoin.test.lua
@@ -0,0 +1,88 @@
+env = require('test_run')
+test_run = env.new()
+
+--
+-- gh-461: check that garbage collection works as expected
+-- after rebootstrap.
+--
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test', { id = 9000, engine = 'vinyl' })
+_ = box.space.test:create_index('pk')
+pad = string.rep('x', 15 * 1024)
+for i = 1, 100 do box.space.test:replace{i, pad} end
+box.snapshot()
+
+-- Join a replica. Check its files.
+test_run:cmd("create server replica with rpl_master=default, script='vinyl/replica_rejoin.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+fio = require('fio')
+fio.chdir(box.cfg.vinyl_dir)
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+
+-- Invoke garbage collector on the master.
+test_run:cmd("restart server default")
+checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 1}
+box.space.test:delete(1)
+box.snapshot()
+box.cfg{checkpoint_count = checkpoint_count}
+
+-- Rebootstrap the replica. Check that old files are removed
+-- by garbage collector.
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+box.cfg{checkpoint_count = 1}
+box.snapshot()
+fio = require('fio')
+fio.chdir(box.cfg.vinyl_dir)
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+box.space.test:count() -- 99
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+
+-- Invoke garbage collector on the master.
+test_run:cmd("restart server default")
+checkpoint_count = box.cfg.checkpoint_count
+box.cfg{checkpoint_count = 1}
+box.space.test:delete(2)
+box.snapshot()
+box.cfg{checkpoint_count = checkpoint_count}
+
+-- Make the master fail join after sending data. Check that
+-- files written during failed rebootstrap attempt are removed
+-- by garbage collector.
+box.error.injection.set('ERRINJ_RELAY_FINAL_JOIN', true)
+test_run:cmd("start server replica with crash_expected=True") -- fail
+test_run:cmd("start server replica with crash_expected=True") -- fail again
+test_run:cmd("start server replica with args='disable_replication'")
+test_run:cmd("switch replica")
+box.cfg{checkpoint_count = 1}
+box.snapshot()
+fio = require('fio')
+fio.chdir(box.cfg.vinyl_dir)
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+box.space.test:count() -- 99
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+box.error.injection.set('ERRINJ_RELAY_FINAL_JOIN', false)
+
+-- Rebootstrap after several failed attempts and make sure
+-- old files are removed.
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+box.cfg{checkpoint_count = 1}
+box.snapshot()
+fio = require('fio')
+fio.chdir(box.cfg.vinyl_dir)
+fio.glob(fio.pathjoin(box.space.test.id, 0, '*'))
+box.space.test:count() -- 98
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+
+-- Cleanup.
+test_run:cmd("cleanup server replica")
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/vinyl/suite.ini b/test/vinyl/suite.ini
index 322c6a474c4cd5c868ad9020a8dc5afd26964c43..ff928867fff38b059c720e4ccec7e6b7bead4f78 100644
--- a/test/vinyl/suite.ini
+++ b/test/vinyl/suite.ini
@@ -2,7 +2,7 @@
 core = tarantool
 description = vinyl integration tests
 script = vinyl.lua
-release_disabled = errinj.test.lua errinj_gc.test.lua errinj_vylog.test.lua partial_dump.test.lua quota_timeout.test.lua recovery_quota.test.lua
+release_disabled = errinj.test.lua errinj_gc.test.lua errinj_vylog.test.lua partial_dump.test.lua quota_timeout.test.lua recovery_quota.test.lua replica_rejoin.test.lua
 config = suite.cfg
 lua_libs = suite.lua stress.lua large.lua txn_proxy.lua ../box/lua/utils.lua
 use_unix_sockets = True