diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index d1f8cac069e55235baf727a9e507f0bd2ff71501..2a8220a154ebfe045f0e87c10ca6b50d1f9c96c1 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -60,7 +60,7 @@ set (common_sources
      admin.cc
      replica.cc
      iproto.cc
-     iproto_constants.c
+     iproto_constants.cc
      iproto_port.cc
      session.cc
      object.cc
diff --git a/src/admin.cc b/src/admin.cc
index 15ee0af3249ff3c82d572079d33946be69ac68e0..ae2655433cca9c43aa05aa1adac852e3bfbc721f 100644
--- a/src/admin.cc
+++ b/src/admin.cc
@@ -89,8 +89,9 @@ admin_handler(va_list ap)
 	 * stored procedures.
 	 */
 
-	session_set_user(session_create(coio.fd, *(uint64_t *) addr),
-			 ADMIN, ADMIN);
+	struct session *session = session_create(coio.fd, *(uint64_t *) addr);
+	session_set_user(session, ADMIN, ADMIN);
+	trigger_run(&session_on_connect, NULL);
 
 	for (;;) {
 		if (admin_dispatch(&coio, iobuf, L) < 0)
diff --git a/src/bootstrap.snap b/src/bootstrap.snap
index 70257782f59a443cdd8e51a3358586f571749bbd..6ee1bc4150098ff3370e4627f37e8d35b969682b 100644
Binary files a/src/bootstrap.snap and b/src/bootstrap.snap differ
diff --git a/src/box/box.cc b/src/box/box.cc
index fc376105d77e30c278f5ae23eeb4ec1673150e11..9e56cdcae9175af504effaafecdaff352f7506cc 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -63,16 +63,16 @@ box_process_func box_process = process_ro;
 static int stat_base;
 int snapshot_pid = 0; /* snapshot processes pid */
 
+static void
+box_snapshot_cb(struct log_io *l);
 
 /** The snapshot row metadata repeats the structure of REPLACE request. */
-struct box_snap_row {
-	uint16_t op;
+struct request_replace_body {
 	uint8_t m_body;
 	uint8_t k_space_id;
 	uint8_t m_space_id;
 	uint32_t v_space_id;
 	uint8_t k_tuple;
-	char data[];
 } __attribute__((packed));
 
 void
@@ -89,7 +89,7 @@ process_rw(struct port *port, struct request *request)
 	struct txn *txn = txn_begin();
 
 	try {
-		stat_collect(stat_base, request->type, 1);
+		stat_collect(stat_base, request->code, 1);
 		request->execute(request, txn, port);
 		txn_commit(txn);
 		port_send_tuple(port, txn);
@@ -104,7 +104,7 @@ process_rw(struct port *port, struct request *request)
 static void
 process_replica(struct port *port, struct request *request)
 {
-	if (!iproto_request_is_select(request->type)) {
+	if (!iproto_request_is_select(request->code)) {
 		tnt_raise(ClientError, ER_NONMASTER,
 			  cfg.replication_source);
 	}
@@ -114,21 +114,21 @@ process_replica(struct port *port, struct request *request)
 static void
 process_ro(struct port *port, struct request *request)
 {
-	if (!iproto_request_is_select(request->type))
+	if (!iproto_request_is_select(request->code))
 		tnt_raise(LoggedError, ER_SECONDARY);
 	return process_rw(port, request);
 }
 
 static int
-recover_row(void *param __attribute__((unused)), const struct log_row *row)
+recover_row(void *param __attribute__((unused)), struct iproto_packet *packet)
 {
 	try {
-		const char *data = row->data;
-		const char *end = data + row->len;
-		uint16_t op = pick_u16(&data, end);
+		assert(packet->bodycnt == 1); /* always 1 for read */
 		struct request request;
-		request_create(&request, op);
-		request_decode(&request, data, end - data);
+		request_create(&request, packet->code);
+		request_decode(&request, (const char *) packet->body[0].iov_base,
+				packet->body[0].iov_len);
+		request.packet = packet;
 		process_rw(&null_port, &request);
 	} catch (Exception *e) {
 		e->log();
@@ -302,7 +302,7 @@ box_init()
 
 	/* recovery initialization */
 	recovery_init(cfg.snap_dir, cfg.wal_dir,
-		      recover_row, NULL, cfg.rows_per_wal);
+		      recover_row, NULL, box_snapshot_cb, cfg.rows_per_wal);
 	recovery_update_io_rate_limit(recovery_state, cfg.snap_io_rate_limit);
 	recovery_setup_panic(recovery_state, cfg.panic_on_snap_error, cfg.panic_on_wal_error);
 
@@ -327,18 +327,25 @@ static void
 snapshot_write_tuple(struct log_io *l,
 		     uint32_t n, struct tuple *tuple)
 {
-	struct box_snap_row header;
-	header.op = IPROTO_INSERT;
-	header.m_body = 0x82; /* map of two elements. */
-	header.k_space_id = IPROTO_SPACE_ID;
-	header.m_space_id = 0xce; /* uint32 */
-	header.v_space_id = mp_bswap_u32(n);
-	header.k_tuple = IPROTO_TUPLE;
-	snapshot_write_row(l, (const char *) &header, sizeof(header),
-	                   tuple->data, tuple->bsize);
+	struct request_replace_body body;
+	body.m_body = 0x82; /* map of two elements. */
+	body.k_space_id = IPROTO_SPACE_ID;
+	body.m_space_id = 0xce; /* uint32 */
+	body.v_space_id = mp_bswap_u32(n);
+	body.k_tuple = IPROTO_TUPLE;
+
+	struct iproto_packet packet;
+	memset(&packet, 0, sizeof(packet));
+	packet.code = IPROTO_INSERT;
+
+	packet.bodycnt = 2;
+	packet.body[0].iov_base = &body;
+	packet.body[0].iov_len = sizeof(body);
+	packet.body[1].iov_base = tuple->data;
+	packet.body[1].iov_len = tuple->bsize;
+	snapshot_write_row(l, &packet);
 }
 
-
 static void
 snapshot_space(struct space *sp, void *udata)
 {
@@ -356,7 +363,7 @@ snapshot_space(struct space *sp, void *udata)
 		snapshot_write_tuple(l, space_id(sp), tuple);
 }
 
-void
+static void
 box_snapshot_cb(struct log_io *l)
 {
 	space_foreach(snapshot_space, l);
@@ -393,7 +400,7 @@ box_snapshot(void)
 	 * parent stdio buffers at exit().
 	 */
 	close_all_xcpt(1, sayfd);
-	snapshot_save(recovery_state, box_snapshot_cb);
+	snapshot_save(recovery_state);
 
 	exit(EXIT_SUCCESS);
 	return 0;
@@ -404,7 +411,7 @@ box_init_storage(const char *dirname)
 {
 	struct log_dir dir = snap_dir;
 	dir.dirname = (char *) dirname;
-	init_storage(&dir, NULL);
+	init_storage_on_master(&dir);
 }
 
 void
diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc
index b9429b0638bab67991c0436d1d9b6543db697acd..559e66299ae49d999ed30a15be8eaa92f243fdc0 100644
--- a/src/box/lua/call.cc
+++ b/src/box/lua/call.cc
@@ -43,6 +43,7 @@
 #include "box/box.h"
 #include "box/port.h"
 #include "box/request.h"
+#include "box/txn.h"
 #include "bit/bit.h"
 #include "box/access.h"
 #include "box/schema.h"
@@ -502,8 +503,7 @@ access_check_func(const char *name, uint32_t name_len,
  * (implementation of 'CALL' command code).
  */
 void
-box_lua_call(struct request *request, struct txn *txn,
-	     struct port *port)
+box_lua_call(struct request *request, struct txn *txn, struct port *port)
 {
 	struct user *user = user();
 	(void) txn;
diff --git a/src/box/request.cc b/src/box/request.cc
index f4703f1f09689b0caa44ba1d820ddc3b5fbc29d7..21bee5b6e782059e11aad5b2b97e2b6fd539afc1 100644
--- a/src/box/request.cc
+++ b/src/box/request.cc
@@ -109,7 +109,8 @@ execute_replace(struct request *request, struct txn *txn, struct port *port)
 					    request->tuple_end);
 	TupleGuard guard(new_tuple);
 	space_validate_tuple(space, new_tuple);
-	enum dup_replace_mode mode = dup_replace_mode(request->type);
+	enum dup_replace_mode mode = dup_replace_mode(request->code);
+
 	txn_add_redo(txn, request);
 	txn_replace(txn, space, NULL, new_tuple, mode);
 }
@@ -223,33 +224,31 @@ execute_auth(struct request *request, struct txn * /* txn */,
 /** }}} */
 
 void
-request_check_type(uint32_t type)
+request_check_code(uint32_t code)
 {
-	if (type < IPROTO_SELECT || type >= IPROTO_DML_REQUEST_MAX)
-		tnt_raise(LoggedError, ER_UNKNOWN_REQUEST_TYPE, type);
+	if (code < IPROTO_SELECT || code >= IPROTO_DML_REQUEST_MAX)
+		tnt_raise(LoggedError, ER_UNKNOWN_REQUEST_TYPE, code);
 }
 
 void
-request_create(struct request *request, uint32_t type)
+request_create(struct request *request, uint32_t code)
 {
-	request_check_type(type);
+	request_check_code(code);
 	static const request_execute_f execute_map[] = {
 		NULL, execute_select, execute_replace, execute_replace,
 		execute_update, execute_delete, box_lua_call,
 		execute_auth,
 	};
 	memset(request, 0, sizeof(*request));
-	request->type = type;
-	request->execute = execute_map[type];
+	request->execute = execute_map[code];
+	request->code = code;
 }
 
 void
 request_decode(struct request *request, const char *data, uint32_t len)
 {
-	assert(request->type != 0);
+	assert(request->execute != NULL);
 	const char *end = data + len;
-	request->data = data;
-	request->len = len;
 
 	if (mp_typeof(*data) != MP_MAP || mp_check_map(data, end) > 0) {
 error:
@@ -303,16 +302,15 @@ request_decode(struct request *request, const char *data, uint32_t len)
 #endif
 }
 
-void
-request_encode(struct request *request)
+int
+request_encode(struct request *request, struct iovec *iov)
 {
-	assert(request->data == NULL);
+	int iovcnt = 1;
 	const int HEADER_LEN_MAX = 32;
-	uint32_t tuple_len = request->tuple_end - request->tuple;
 	uint32_t key_len = request->key_end - request->key;
-	uint32_t len = HEADER_LEN_MAX + tuple_len + key_len;
-	request->data = (char *) region_alloc(&fiber()->gc, len);
-	char *d = (char *) request->data + 1; /* Skip 1 byte for MP_MAP */
+	uint32_t len = HEADER_LEN_MAX + key_len;
+	char *data = (char *) region_alloc(&fiber()->gc, len);
+	char *d = (char *) data + 1; /* Skip 1 byte for MP_MAP */
 	int map_size = 0;
 	if (true) {
 		d = mp_encode_uint(d, IPROTO_SPACE_ID);
@@ -332,11 +330,16 @@ request_encode(struct request *request)
 	}
 	if (request->tuple) {
 		d = mp_encode_uint(d, IPROTO_TUPLE);
-		memcpy(d, request->tuple, tuple_len);
-		d += tuple_len;
+		iov[1].iov_base = (void *) request->tuple;
+		iov[1].iov_len = (request->tuple_end - request->tuple);
+		iovcnt = 2;
 		map_size++;
 	}
-	request->len = (d - request->data);
-	assert(request->len <= len);
-	mp_encode_map((char *) request->data, map_size);
+
+	assert(d <= data + len);
+	mp_encode_map(data, map_size);
+	iov[0].iov_base = data;
+	iov[0].iov_len = (d - data);
+
+	return iovcnt;
 }
diff --git a/src/box/request.h b/src/box/request.h
index e87a46c4c918a273323b8d3b87ccd1a4e74cec72..5f856beb6dfbff0dd56926b8b010ddbed3c98f42 100644
--- a/src/box/request.h
+++ b/src/box/request.h
@@ -34,13 +34,13 @@
 struct txn;
 struct port;
 
-typedef void (*request_execute_f)(struct request *,
-				  struct txn *,
-				  struct port *);
+typedef void (*request_execute_f)(struct request *, struct txn *, struct port *);
+enum { REQUEST_IOVMAX = IPROTO_PACKET_BODY_IOVMAX };
 
 struct request
 {
-	uint32_t type;
+	struct iproto_packet *packet;
+	uint32_t code;
 	uint32_t space_id;
 	uint32_t index_id;
 	uint32_t offset;
@@ -53,18 +53,16 @@ struct request
 	const char *tuple;
 	const char *tuple_end;
 
-	const char *data;
-	uint32_t len;
 	request_execute_f execute;
 };
 
 void
-request_create(struct request *request, uint32_t type);
+request_create(struct request *request, uint32_t code);
 
 void
 request_decode(struct request *request, const char *data, uint32_t len);
 
-void
-request_encode(struct request *request);
+int
+request_encode(struct request *request, struct iovec *iov);
 
 #endif /* TARANTOOL_BOX_REQUEST_H_INCLUDED */
diff --git a/src/box/txn.cc b/src/box/txn.cc
index e227be0da1c8f208073feb7bc0a6ba0bda3dd930..22ad5b442daae01e932bb51ff77aa1e036cd7300 100644
--- a/src/box/txn.cc
+++ b/src/box/txn.cc
@@ -35,13 +35,28 @@
 #include <fiber.h>
 #include "request.h" /* for request_name */
 
+void
+txn_add_redo(struct txn *txn, struct request *request)
+{
+	if (recovery_state->wal_mode == WAL_NONE)
+		return;
+	if (request->packet == NULL) {
+		/* Generate binary body for Lua requests */
+		struct iproto_packet *packet = (struct iproto_packet *)
+			region_alloc0(&fiber()->gc, sizeof(*packet));
+		packet->code = request->code;
+		packet->bodycnt = request_encode(request, packet->body);
+		txn->packet = packet;
+	} else {
+		txn->packet = request->packet;
+	}
+}
+
 void
 txn_replace(struct txn *txn, struct space *space,
 	    struct tuple *old_tuple, struct tuple *new_tuple,
 	    enum dup_replace_mode mode)
 {
-	/* txn_add_undo() must be done after txn_add_redo() */
-	assert(txn->request->type != 0);
 	assert(old_tuple || new_tuple);
 	/*
 	 * Remember the old tuple only if we replaced it
@@ -77,24 +92,21 @@ txn_commit(struct txn *txn)
 {
 	if ((txn->old_tuple || txn->new_tuple) &&
 	    !space_is_temporary(txn->space)) {
-		struct request *request = txn->request;
+		struct iproto_packet *packet = txn->packet;
 		int64_t lsn = next_lsn(recovery_state);
 
 		int res = 0;
 		if (recovery_state->wal_mode != WAL_NONE) {
-			/* Generate binary body for Lua requests */
-			if (request->data == NULL)
-				request_encode(request);
-
+			/* txn_commit() must be done after txn_add_redo() */
+			assert(txn->packet != NULL);
+			packet->lsn = lsn;
 			ev_tstamp start = ev_now(loop()), stop;
-			res = wal_write(recovery_state, lsn, fiber()->cookie,
-					request->type, request->data,
-					request->len);
+			res = wal_write(recovery_state, packet);
 			stop = ev_now(loop());
 
 			if (stop - start > cfg.too_long_threshold) {
 				say_warn("too long %s: %.3f sec",
-					 iproto_request_name(request->type),
+					 iproto_request_name(packet->code),
 					 stop - start);
 			}
 		}
diff --git a/src/box/txn.h b/src/box/txn.h
index fb0802d3361394390d0c97edc11be13f3a0fc5d8..b066ebcd13fd7a1008ca5584c6214c19362dd456 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -44,15 +44,9 @@ struct txn {
 	struct rlist on_rollback;
 
 	/* Redo info: binary packet */
-	struct request *request;
+	struct iproto_packet *packet;
 };
 
-static inline void
-txn_add_redo(struct txn *txn, struct request *request)
-{
-	txn->request = request;
-}
-
 struct txn *txn_begin();
 void txn_commit(struct txn *txn);
 void txn_finish(struct txn *txn);
@@ -60,4 +54,5 @@ void txn_rollback(struct txn *txn);
 void txn_replace(struct txn *txn, struct space *space,
 		 struct tuple *old_tuple, struct tuple *new_tuple,
 		 enum dup_replace_mode mode);
+void txn_add_redo(struct txn *txn, struct request *request);
 #endif /* TARANTOOL_BOX_TXN_H_INCLUDED */
diff --git a/src/fio.c b/src/fio.c
index 3be6160a5068e7b327ee3343200efb2750978e6e..7f1024bd8e1a1cf9b269e95894d654fc422a30e1 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -36,6 +36,7 @@
 #include <limits.h>
 #include <stdio.h>
 #include <errno.h>
+#include <lib/bit/bit.h>
 
 #include <say.h>
 
@@ -158,58 +159,78 @@ fio_truncate(int fd, off_t offset)
 }
 
 struct fio_batch *
-fio_batch_alloc(long max_iov)
+fio_batch_alloc(int max_iov)
 {
 	struct fio_batch *batch = (struct fio_batch *)
 		malloc(sizeof(struct fio_batch) +
-		       sizeof(struct iovec) * max_iov);
+		       sizeof(struct iovec) * max_iov +
+		       (max_iov / CHAR_BIT + 1));
 	if (batch == NULL)
 		return NULL;
-	batch->bytes = batch->rows = batch->max_rows = 0;
+	batch->bytes = batch->rows = batch->iovcnt = batch->max_rows = 0;
 	batch->max_iov = max_iov;
+	batch->rowflag = (char *) (batch + 1) + sizeof(struct iovec) * max_iov;
 	return batch;
 }
 
 void
 fio_batch_start(struct fio_batch *batch, long max_rows)
 {
-	batch->bytes = batch->rows = 0;
+	batch->bytes = batch->rows = batch->iovcnt = 0;
 	batch->max_rows = max_rows;
+	memset(batch->rowflag, 0, batch->max_iov / CHAR_BIT + 1);
 }
 
 void
-fio_batch_add(struct fio_batch *batch, void *row, ssize_t row_len)
+fio_batch_add(struct fio_batch *batch, const struct iovec *iov, int iovcnt)
 {
+	assert(!fio_batch_has_space(batch, iovcnt));
+	assert(iovcnt > 0);
 	assert(batch->max_rows > 0);
-	assert(! fio_batch_is_full(batch));
-
-	batch->iov[batch->rows].iov_base = row;
-	batch->iov[batch->rows].iov_len = row_len;
+	for (int i = 0; i < iovcnt; i++) {
+		batch->iov[batch->iovcnt++] = iov[i];
+		batch->bytes += iov[i].iov_len;
+	}
+	bit_set(batch->rowflag, batch->iovcnt);
 	batch->rows++;
-	batch->bytes += row_len;
 }
 
 int
 fio_batch_write(struct fio_batch *batch, int fd)
 {
-	ssize_t bytes_written = fio_writev(fd, batch->iov, batch->rows);
+	ssize_t bytes_written = fio_writev(fd, batch->iov, batch->iovcnt);
 	if (bytes_written <= 0)
 		return 0;
 
 	if (bytes_written == batch->bytes)
-		return batch->rows;
+		return batch->rows; /* returns the number of written rows */
 
 	say_warn("fio_batch_write, [%s]: partial write,"
 		 " wrote %jd out of %jd bytes",
 		 fio_filename(fd),
 		 (intmax_t) bytes_written, (intmax_t) batch->bytes);
 
-	ssize_t good_bytes = 0;
+	/* Iterate over end of row flags */
+	struct bit_iterator bit_it;
+	bit_iterator_init(&bit_it, batch->rowflag,
+			  batch->max_iov / CHAR_BIT + 1, 1);
+	size_t row_last_iov = bit_iterator_next(&bit_it);
+
+	int good_rows = 0; /* the number of fully written rows */
+	ssize_t good_bytes = 0; /* the number of bytes in fully written rows */
+	ssize_t row_bytes = 0;  /* the number of bytes in the current row */
 	struct iovec *iov = batch->iov;
-	while (iov < batch->iov + batch->rows) {
-		if (good_bytes + iov->iov_len > bytes_written)
+	while (iov < batch->iov + batch->iovcnt) {
+		if (good_bytes + row_bytes + iov->iov_len > bytes_written)
 			break;
-		good_bytes += iov->iov_len;
+		row_bytes += iov->iov_len;
+		if ((iov - batch->iov) == row_last_iov) {
+			/* the end of current row  */
+			good_bytes += row_bytes;
+			row_bytes = 0;
+			good_rows++;
+			row_last_iov = bit_iterator_next(&bit_it);
+		}
 		iov++;
 	}
 	/*
@@ -232,5 +253,5 @@ fio_batch_write(struct fio_batch *batch, int fd)
 	 */
 	if (! errno)
 		errno = EAGAIN;
-	return iov - batch->iov;
+	return good_rows;  /* returns the number of written rows */
 }
diff --git a/src/fio.h b/src/fio.h
index bbece9db8a5b789ec2da6c5977f04dba6eb600ba..957c8273a5097202f6044b84dca382341251dfbb 100644
--- a/src/fio.h
+++ b/src/fio.h
@@ -153,25 +153,32 @@ struct fio_batch
 	ssize_t bytes;
 	/** Total number of batched rows.*/
 	int rows;
+	/** Total number of I/O vectors */
+	int iovcnt;
 	/** A cap on how many rows can be batched. Can be set to INT_MAX. */
 	int max_rows;
 	/** A system cap on how many rows can be batched. */
-	long max_iov;
+	int max_iov;
+	/**
+	 * End of row flags for each iov (bitset). fio_write() tries to
+	 * write {iov, iov, iov with flag} blocks atomically.
+	 */
+	char *rowflag;
 	/* Batched rows. */
 	struct iovec iov[];
 };
 
 struct fio_batch *
-fio_batch_alloc(long max_iov);
+fio_batch_alloc(int max_iov);
 
 /** Begin a new batch write. Set a cap on the number of rows in the batch.  */
 void
 fio_batch_start(struct fio_batch *batch, long max_rows);
 
 static inline bool
-fio_batch_is_full(struct fio_batch *batch)
+fio_batch_has_space(struct fio_batch *batch, int iovcnt)
 {
-	return batch->rows >= batch->max_iov ||
+	return batch->iovcnt + iovcnt > batch->max_iov ||
 		batch->rows >= batch->max_rows;
 }
 
@@ -180,7 +187,7 @@ fio_batch_is_full(struct fio_batch *batch)
  * @pre fio_batch_is_full() == false
  */
 void
-fio_batch_add(struct fio_batch *batch, void *row, ssize_t row_len);
+fio_batch_add(struct fio_batch *batch, const struct iovec *iov, int iovcnt);
 
 /**
  * Write all rows stacked into the batch.
diff --git a/src/iproto.cc b/src/iproto.cc
index 808bbac059762a615252e100b086b2cc151d68ba..560762dcd8bd9d220887bb3709a64c530316a210 100644
--- a/src/iproto.cc
+++ b/src/iproto.cc
@@ -77,9 +77,10 @@ struct iproto_request
 	struct session *session;
 	iproto_request_f process;
 	/* Request message code and sync. */
-	uint32_t header[2];
+	struct iproto_packet packet;
 	/* Box request, if this is a DML */
 	struct request request;
+	size_t total_len;
 };
 
 struct mempool iproto_request_pool;
@@ -383,18 +384,6 @@ iproto_connection_close(struct iproto_connection *con)
 	close(fd);
 }
 
-static inline void
-iproto_validate_header(uint32_t len)
-{
-	if (len > IPROTO_BODY_LEN_MAX) {
-		/*
-		 * The package is too big, just close connection for now to
-		 * avoid DoS.
-		 */
-		tnt_raise(IllegalParams, "received packet is too big");
-	}
-}
-
 /**
  * If there is no space for reading input, we can do one of the
  * following:
@@ -468,73 +457,24 @@ iproto_connection_input_iobuf(struct iproto_connection *con)
 	return newbuf;
 }
 
-
-int64_t
-subscribe_request_decode(const char *begin, const char *end)
-{
-	const char *pos = begin;
-	if (mp_check(&pos, end))
-		goto error;
-	if (mp_typeof(*begin) != MP_MAP)
-		goto error;
-	mp_decode_map(&begin);
-	/* Key */
-	if (mp_typeof(*begin) != MP_UINT)
-		goto error;
-	mp_decode_uint(&begin);
-	/* Value */
-	if (mp_typeof(*begin) != MP_UINT)
-		goto error;
-	return mp_decode_uint(&begin);
-error:
-	tnt_raise(ClientError, ER_INVALID_MSGPACK, "subscribe request body");
-}
-
-static inline void
-iproto_decode_header(const char **pos, const char *end, uint32_t *keys)
-{
-	/* Only a small map can be here. */
-	if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
-error:
-		tnt_raise(ClientError,
-			  ER_INVALID_MSGPACK, "packet header");
-	}
-	uint32_t size = mp_decode_map(pos);
-	for (int i = 0; i < size; i++) {
-
-		if (! iproto_header_has_key(*pos, end)) {
-			mp_check(pos, end);
-			mp_check(pos, end);
-			continue;
-		}
-
-		unsigned char key = mp_decode_uint(pos);
-
-		if (mp_typeof(**pos) != MP_UINT ||
-		    mp_check_uint(*pos, end) > 0)
-			goto error;
-
-		keys[key] = mp_decode_uint(pos);
-	}
-}
-
 static void
 iproto_process_admin(struct iproto_request *ireq,
-		     struct iproto_connection *con,
-		     const char *body, const char *end)
+		     struct iproto_connection *con)
 {
-	switch (ireq->header[IPROTO_CODE]) {
+	switch (ireq->packet.code) {
 	case IPROTO_PING:
-		iproto_reply_ping(&ireq->iobuf->out,
-				  ireq->header[IPROTO_SYNC]);
+		iproto_reply_ping(&ireq->iobuf->out, ireq->packet.sync);
 		break;
 	case IPROTO_SUBSCRIBE:
-		subscribe(con->input.fd,
-			  subscribe_request_decode(body, end));
+		if (ireq->packet.bodycnt != 0) {
+			tnt_raise(ClientError, ER_INVALID_MSGPACK,
+				  "subscribe request body");
+		}
+		subscribe(con->input.fd, ireq->packet.lsn, ireq->packet.sync);
 		tnt_raise(IprotoConnectionShutdown);
 	default:
 		tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
-			  (uint32_t) ireq->header[IPROTO_CODE]);
+			   (uint32_t) ireq->packet.code);
 	}
 	if (! ev_is_active(&con->output))
 		ev_feed_event(con->loop, &con->output, EV_WRITE);
@@ -556,7 +496,8 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 		if (mp_check_uint(pos, in->end) >= 0)
 			break;
 		uint32_t len = mp_decode_uint(&pos);
-		iproto_validate_header(len);
+
+		/* Skip fixheader */
 		const char *reqend = pos + len;
 		if (reqend > in->end)
 			break;
@@ -564,25 +505,32 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
 			iproto_request_new(con, iproto_process_dml);
 		IprotoRequestGuard guard(ireq);
 
-		ireq->header[IPROTO_CODE] = ireq->header[IPROTO_SYNC] = 0;
-		iproto_decode_header(&pos, reqend, ireq->header);
+		iproto_packet_decode(&ireq->packet, &pos, reqend);
+		ireq->total_len = pos - reqstart; /* total request length */
+
 		/*
 		 * sic: in case of exception con->parse_size
 		 * as well as in->pos must not be advanced, to
 		 * stay in sync.
 		 */
-		if (iproto_request_is_dml(ireq->header[IPROTO_CODE])) {
-			request_create(&ireq->request,
-				       ireq->header[IPROTO_CODE]);
-			request_decode(&ireq->request, pos, reqend - pos);
+		if (iproto_request_is_dml(ireq->packet.code)) {
+			if (ireq->packet.bodycnt == 0) {
+				tnt_raise(IllegalParams,
+					  "Invalid MsgPack - invalid request");
+			}
+			request_create(&ireq->request, ireq->packet.code);
+			pos = (const char *) ireq->packet.body[0].iov_base;
+			request_decode(&ireq->request, pos,
+				       ireq->packet.body[0].iov_len);
+			ireq->request.packet = &ireq->packet;
 			iproto_queue_push(&request_queue, guard.release());
-			/* Request header can be discarded. */
-			in->pos += pos - reqstart;
-	        } else {
-			iproto_process_admin(ireq, con, pos, reqend);
+			/* Request will be discarded in iproto_process_dml */
+		} else {
+			iproto_process_admin(ireq, con);
 			/* Entire request can be discarded. */
-			in->pos += reqend - reqstart;
+			in->pos += ireq->packet.body[0].iov_len;
 		}
+
 		/* Request is parsed */
 		con->parse_size -= reqend - reqstart;
 		if (con->parse_size == 0)
@@ -723,7 +671,8 @@ iproto_process_dml(struct iproto_request *ireq)
 	struct iproto_connection *con = ireq->connection;
 
 	auto scope_guard = make_scoped_guard([=]{
-		iobuf->in.pos += ireq->request.len;
+		/* Discard request (see iproto_enqueue_batch()) */
+		iobuf->in.pos += ireq->total_len;
 
 		if (evio_is_active(&con->output)) {
 			if (! ev_is_active(&con->output))
@@ -741,13 +690,13 @@ iproto_process_dml(struct iproto_request *ireq)
 	struct obuf *out = &iobuf->out;
 
 	struct iproto_port port;
-	iproto_port_init(&port, out, ireq->header[IPROTO_SYNC]);
+	iproto_port_init(&port, out, ireq->packet.sync);
 	try {
 		box_process((struct port *) &port, &ireq->request);
 	} catch (ClientError *e) {
 		if (port.found)
 			obuf_rollback_to_svp(out, &port.svp);
-		iproto_reply_error(out, e, ireq->header[IPROTO_SYNC]);
+		iproto_reply_error(out, e, ireq->packet.sync);
 	}
 }
 
@@ -793,8 +742,9 @@ iproto_process_connect(struct iproto_request *request)
 		con->session = session_create(fd, con->cookie);
 		coio_write(&con->input, iproto_greeting(con->session->salt),
 			   IPROTO_GREETING_SIZE);
+		trigger_run(&session_on_connect, NULL);
 	} catch (ClientError *e) {
-		iproto_reply_error(&iobuf->out, e, request->header[IPROTO_SYNC]);
+		iproto_reply_error(&iobuf->out, e, request->packet.code);
 		try {
 			iproto_flush(iobuf, fd, &con->write_pos);
 		} catch (Exception *e) {
diff --git a/src/iproto_constants.c b/src/iproto_constants.c
deleted file mode 100644
index 31ed925aecacc9786e04dec2d1cf696417cca9a9..0000000000000000000000000000000000000000
--- a/src/iproto_constants.c
+++ /dev/null
@@ -1,68 +0,0 @@
-#include "iproto_constants.h"
-#include "msgpuck/msgpuck.h"
-
-unsigned char iproto_key_type[IPROTO_KEY_MAX] =
-{
-	/* {{{ header */
-		/* 0x00 */	MP_UINT, /* IPROTO_CODE */
-		/* 0x01 */	MP_UINT, /* IPROTO_SYNC */
-	/* }}} */
-
-	/* {{{ unused */
-		/* 0x02 */	MP_UINT,
-		/* 0x03 */	MP_UINT,
-		/* 0x04 */	MP_UINT,
-		/* 0x05 */	MP_UINT,
-		/* 0x06 */	MP_UINT,
-		/* 0x07 */	MP_UINT,
-		/* 0x08 */	MP_UINT,
-		/* 0x09 */	MP_UINT,
-		/* 0x0a */	MP_UINT,
-		/* 0x0b */	MP_UINT,
-		/* 0x0c */	MP_UINT,
-		/* 0x0d */	MP_UINT,
-		/* 0x0e */	MP_UINT,
-		/* 0x0f */	MP_UINT,
-	/* }}} */
-
-	/* {{{ body -- integer keys */
-		/* 0x10 */	MP_UINT, /* IPROTO_SPACE_ID */
-		/* 0x11 */	MP_UINT, /* IPROTO_INDEX_ID */
-		/* 0x12 */	MP_UINT, /* IPROTO_LIMIT */
-		/* 0x13 */	MP_UINT, /* IPROTO_OFFSET */
-		/* 0x14 */	MP_UINT, /* IPROTO_ITERATOR */
-	/* }}} */
-
-	/* {{{ unused */
-		/* 0x15 */	MP_UINT,
-		/* 0x16 */	MP_UINT,
-		/* 0x17 */	MP_UINT,
-		/* 0x18 */	MP_UINT,
-		/* 0x19 */	MP_UINT,
-		/* 0x1a */	MP_UINT,
-		/* 0x1b */	MP_UINT,
-		/* 0x1c */	MP_UINT,
-		/* 0x1d */	MP_UINT,
-		/* 0x1e */	MP_UINT,
-		/* 0x1f */	MP_UINT,
-	/* }}} */
-
-	/* {{{ body -- all keys */
-	/* 0x20 */	MP_ARRAY, /* IPROTO_KEY */
-	/* 0x21 */	MP_ARRAY, /* IPROTO_TUPLE */
-	/* 0x22 */	MP_STR, /* IPROTO_FUNCTION_NAME */
-	/* 0x23 */	MP_STR, /* IPROTO_USER_NAME */
-	/* }}} */
-};
-
-const char *iproto_request_type_strs[] =
-{
-	NULL,
-	"SELECT",
-	"INSERT",
-	"REPLACE",
-	"UPDATE",
-	"DELETE",
-	"CALL",
-	NULL,
-};
diff --git a/src/iproto_constants.cc b/src/iproto_constants.cc
new file mode 100644
index 0000000000000000000000000000000000000000..7aa1d975e417fbdc3f2477ac370808fa57206ff8
--- /dev/null
+++ b/src/iproto_constants.cc
@@ -0,0 +1,193 @@
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "iproto_constants.h"
+#include "msgpuck/msgpuck.h"
+#include "exception.h"
+#include "fiber.h"
+#include "crc32.h"
+
+const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
+{
+	/* {{{ header */
+		/* 0x00 */	MP_UINT,   /* IPROTO_CODE */
+		/* 0x01 */	MP_UINT,   /* IPROTO_SYNC */
+		/* 0x02 */	MP_UINT,   /* IPROTO_SERVER_ID */
+		/* 0x03 */	MP_UINT,   /* IPROTO_LSN */
+		/* 0x04 */	MP_DOUBLE, /* IPROTO_TIMESTAMP */
+	/* }}} */
+
+	/* {{{ unused */
+		/* 0x05 */	MP_UINT,
+		/* 0x06 */	MP_UINT,
+		/* 0x07 */	MP_UINT,
+		/* 0x08 */	MP_UINT,
+		/* 0x09 */	MP_UINT,
+		/* 0x0a */	MP_UINT,
+		/* 0x0b */	MP_UINT,
+		/* 0x0c */	MP_UINT,
+		/* 0x0d */	MP_UINT,
+		/* 0x0e */	MP_UINT,
+		/* 0x0f */	MP_UINT,
+	/* }}} */
+
+	/* {{{ body -- integer keys */
+		/* 0x10 */	MP_UINT, /* IPROTO_SPACE_ID */
+		/* 0x11 */	MP_UINT, /* IPROTO_INDEX_ID */
+		/* 0x12 */	MP_UINT, /* IPROTO_LIMIT */
+		/* 0x13 */	MP_UINT, /* IPROTO_OFFSET */
+		/* 0x14 */	MP_UINT, /* IPROTO_ITERATOR */
+	/* }}} */
+
+	/* {{{ unused */
+		/* 0x15 */	MP_UINT,
+		/* 0x16 */	MP_UINT,
+		/* 0x17 */	MP_UINT,
+		/* 0x18 */	MP_UINT,
+		/* 0x19 */	MP_UINT,
+		/* 0x1a */	MP_UINT,
+		/* 0x1b */	MP_UINT,
+		/* 0x1c */	MP_UINT,
+		/* 0x1d */	MP_UINT,
+		/* 0x1e */	MP_UINT,
+		/* 0x1f */	MP_UINT,
+	/* }}} */
+
+	/* {{{ body -- all keys */
+	/* 0x20 */	MP_ARRAY, /* IPROTO_KEY */
+	/* 0x21 */	MP_ARRAY, /* IPROTO_TUPLE */
+	/* 0x22 */	MP_STR, /* IPROTO_FUNCTION_NAME */
+	/* 0x23 */	MP_STR, /* IPROTO_USER_NAME */
+	/* }}} */
+};
+
+const char *iproto_request_type_strs[] =
+{
+	NULL,
+	"SELECT",
+	"INSERT",
+	"REPLACE",
+	"UPDATE",
+	"DELETE",
+	"CALL"
+};
+
+void
+iproto_packet_decode(struct iproto_packet *packet, const char **pos,
+		     const char *end)
+{
+	memset(packet, 0, sizeof(*packet));
+	const char *pos2 = *pos;
+	if (mp_check(&pos2, end) != 0) {
+error:
+		tnt_raise(ClientError, ER_INVALID_MSGPACK, "packet header");
+	}
+
+	if (mp_typeof(**pos) != MP_MAP)
+		goto error;
+
+	uint32_t size = mp_decode_map(pos);
+	for (uint32_t i = 0; i < size; i++) {
+		if (mp_typeof(**pos) != MP_UINT)
+			goto error;
+		unsigned char key = mp_decode_uint(pos);
+		if (iproto_key_type[key] != mp_typeof(**pos))
+			goto error;
+		switch (key) {
+		case IPROTO_CODE:
+			packet->code = mp_decode_uint(pos);
+			break;
+		case IPROTO_SYNC:
+			packet->sync = mp_decode_uint(pos);
+			break;
+		case IPROTO_LSN:
+			packet->lsn = mp_decode_uint(pos);
+			break;
+		case IPROTO_TIMESTAMP:
+			packet->tm = mp_decode_double(pos);
+			break;
+		default:
+			/* unknown header */
+			mp_next(pos);
+		}
+	}
+
+	assert(*pos <= end);
+	if (*pos < end) {
+		packet->bodycnt = 1;
+		packet->body[0].iov_base = (void *) *pos;
+		packet->body[0].iov_len = (end - *pos);
+		*pos = end;
+	}
+}
+
+int
+iproto_packet_encode(const struct iproto_packet *packet, struct iovec *iov)
+{
+	enum { HEADER_LEN_MAX = 40 };
+
+	/* allocate memory for sign + header */
+	char *data = (char *) region_alloc(&fiber()->gc, HEADER_LEN_MAX);
+
+	/* Header */
+	char *d = data + 1; /* Skip 1 byte for MP_MAP */
+	int map_size = 0;
+	if (true) {
+		d = mp_encode_uint(d, IPROTO_CODE);
+		d = mp_encode_uint(d, packet->code);
+		map_size++;
+	}
+
+	if (packet->sync) {
+		d = mp_encode_uint(d, IPROTO_SYNC);
+		d = mp_encode_uint(d, packet->sync);
+		map_size++;
+	}
+
+	if (packet->lsn) {
+		d = mp_encode_uint(d, IPROTO_LSN);
+		d = mp_encode_uint(d, packet->lsn);
+		map_size++;
+	}
+
+	if (packet->tm) {
+		d = mp_encode_uint(d, IPROTO_TIMESTAMP);
+		d = mp_encode_double(d, packet->tm);
+		map_size++;
+	}
+
+	assert(d <= data + HEADER_LEN_MAX);
+	mp_encode_map(data, map_size);
+	iov->iov_base = data;
+	iov->iov_len = (d - data);
+	iov++;
+
+	memcpy(iov, packet->body, sizeof(*iov) * packet->bodycnt);
+	assert(1 + packet->bodycnt <= IPROTO_PACKET_IOVMAX);
+	return 1 + packet->bodycnt; /* new iovcnt */
+}
diff --git a/src/iproto_constants.h b/src/iproto_constants.h
index 63ff74afe499c822529c5ce3d4f4e80045040137..3309d043b46ca7bf95390aa19d185bf2ce956df8 100644
--- a/src/iproto_constants.h
+++ b/src/iproto_constants.h
@@ -30,17 +30,29 @@
  */
 #include <stdbool.h>
 #include <stdint.h>
+#include <sys/uio.h> /* struct iovec */
+#include <msgpuck/msgpuck.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
 
 enum {
 	/** Maximal iproto package body length (2GiB) */
 	IPROTO_BODY_LEN_MAX = 2147483648UL,
 	IPROTO_GREETING_SIZE = 128,
+	IPROTO_FIXHEADER_SIZE = 5, /* len + (padding) */
+	XLOG_FIXHEADER_SIZE = 19 /* marker + len + prev crc32 + cur crc32 + (padding) */
 };
 
 
 enum iproto_key {
 	IPROTO_CODE = 0x00,
 	IPROTO_SYNC = 0x01,
+	/* replication keys */
+	IPROTO_SERVER_ID = 0x02,
+	IPROTO_LSN = 0x03,
+	IPROTO_TIMESTAMP = 0x04,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
@@ -60,7 +72,7 @@ enum iproto_key {
 
 #define bit(c) (1ULL<<IPROTO_##c)
 
-#define IPROTO_HEAD_BMAP (bit(CODE) | bit(SYNC))
+#define IPROTO_HEAD_BMAP (bit(CODE) | bit(SYNC) | bit(SERVER_ID) | bit(LSN))
 #define IPROTO_BODY_BMAP (bit(SPACE_ID) | bit(INDEX_ID) | bit(LIMIT) |\
 			  bit(OFFSET) | bit(KEY) | bit(TUPLE) | \
 			  bit(FUNCTION_NAME) | bit(USER_NAME))
@@ -80,8 +92,7 @@ iproto_body_has_key(const char *pos, const char *end)
 
 #undef bit
 
-
-extern unsigned char iproto_key_type[IPROTO_KEY_MAX];
+extern const unsigned char iproto_key_type[IPROTO_KEY_MAX];
 
 enum iproto_request_type {
 	IPROTO_SELECT = 1,
@@ -118,4 +129,51 @@ iproto_request_is_dml(uint32_t type)
 	return type < IPROTO_DML_REQUEST_MAX;
 }
 
+enum {
+	IPROTO_PACKET_HEAD_IOVMAX = 1,
+	IPROTO_PACKET_BODY_IOVMAX = 2,
+	IPROTO_PACKET_IOVMAX = IPROTO_PACKET_HEAD_IOVMAX +
+		IPROTO_PACKET_BODY_IOVMAX
+};
+
+struct iproto_packet {
+	uint32_t code;
+	uint64_t sync;
+	uint64_t lsn;
+	double tm;
+
+	int bodycnt;
+	struct iovec body[IPROTO_PACKET_BODY_IOVMAX];
+};
+
+void
+iproto_packet_decode(struct iproto_packet *packet, const char **pos, const char *end);
+int
+iproto_packet_encode(const struct iproto_packet *packet, struct iovec *out);
+
+struct iproto_subscribe {
+	uint8_t m_len;                          /* MP_STR */
+	uint32_t v_len;                         /* length */
+	uint8_t m_header;                       /* MP_MAP */
+	uint8_t k_code;                         /* IPROTO_CODE */
+	uint8_t v_code;                         /* response status */
+	uint8_t k_sync;                         /* IPROTO_SYNC */
+	uint8_t m_sync;                         /* MP_UINT64 */
+	uint64_t sync;                          /* sync */
+	uint8_t k_lsn;                          /* IPROTO_LSN */
+	uint8_t m_lsn;                          /* MP_UINT64 */
+	uint64_t lsn;                           /* lsn */
+} __attribute__((packed));
+
+static const struct iproto_subscribe iproto_subscribe_stub = {
+	0xce, mp_bswap_u32(sizeof(struct iproto_subscribe) - 5), 0x83,
+	IPROTO_CODE, IPROTO_SUBSCRIBE,
+	IPROTO_SYNC, 0xcf, 0,
+	IPROTO_LSN, 0xcf, 0
+};
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif
+
 #endif /* TARANTOOL_IPROTO_CONSTANTS_H_INCLUDED */
diff --git a/src/iproto_port.cc b/src/iproto_port.cc
index f8a3f38339e2f88618dafad60af6f17022d891de..15bcc8fabb3c8c0317700ddc5ac98f89df6f985f 100644
--- a/src/iproto_port.cc
+++ b/src/iproto_port.cc
@@ -39,11 +39,11 @@ struct iproto_header_bin {
 	uint32_t v_code;                        /* response status */
 	uint8_t k_sync;                         /* IPROTO_SYNC */
 	uint8_t m_sync;                         /* MP_UIN32 */
-	uint32_t v_sync;                        /* sync */
+	uint64_t v_sync;                        /* sync */
 } __attribute__((packed));
 
 static const struct iproto_header_bin iproto_header_bin = {
-	0xce, 0, 0x82, IPROTO_CODE, 0xce, 0, IPROTO_SYNC, 0xce, 0
+	0xce, 0, 0x82, IPROTO_CODE, 0xce, 0, IPROTO_SYNC, 0xcf, 0
 };
 
 struct iproto_body_bin {
@@ -62,16 +62,16 @@ static const struct iproto_body_bin iproto_error_bin = {
 };
 
 void
-iproto_reply_ping(struct obuf *out, uint32_t sync)
+iproto_reply_ping(struct obuf *out, uint64_t sync)
 {
 	struct iproto_header_bin reply = iproto_header_bin;
 	reply.v_len = mp_bswap_u32(sizeof(iproto_header_bin) - 5);
-	reply.v_sync = mp_bswap_u32(sync);
+	reply.v_sync = mp_bswap_u64(sync);
 	obuf_dup(out, &reply, sizeof(reply));
 }
 
 void
-iproto_reply_error(struct obuf *out, const ClientError *e, uint32_t sync)
+iproto_reply_error(struct obuf *out, const ClientError *e, uint64_t sync)
 {
 	uint32_t msg_len = strlen(e->errmsg());
 
@@ -81,7 +81,7 @@ iproto_reply_error(struct obuf *out, const ClientError *e, uint32_t sync)
 	uint32_t len = sizeof(header) - 5 + sizeof(body) + msg_len;
 	header.v_len = mp_bswap_u32(len);
 	header.v_code = mp_bswap_u32(tnt_errcode_val(e->errcode()));
-	header.v_sync = mp_bswap_u32(sync);
+	header.v_sync = mp_bswap_u64(sync);
 
 	body.v_data_len = mp_bswap_u32(msg_len);
 
@@ -110,7 +110,7 @@ iproto_port_eof(struct port *ptr)
 
 	struct iproto_header_bin header = iproto_header_bin;
 	header.v_len = mp_bswap_u32(len);
-	header.v_sync = mp_bswap_u32(port->sync);
+	header.v_sync = mp_bswap_u64(port->sync);
 
 	struct iproto_body_bin body = iproto_body_bin;
 	body.v_data_len = mp_bswap_u32(port->found);
diff --git a/src/iproto_port.h b/src/iproto_port.h
index 9de89ab8637c0f2f0a62f0b08a6b35974a5d93a8..b1791fecb31969029946ed00ad4509e722201198 100644
--- a/src/iproto_port.h
+++ b/src/iproto_port.h
@@ -60,7 +60,7 @@ struct iproto_port
 	/** Output buffer. */
 	struct obuf *buf;
 	/** Reply header. */
-	uint32_t sync;
+	uint64_t sync;
 	uint32_t found;
 	/** A pointer in the reply buffer where the reply starts. */
 	struct obuf_svp svp;
@@ -70,7 +70,7 @@ extern struct port_vtab iproto_port_vtab;
 
 static inline void
 iproto_port_init(struct iproto_port *port, struct obuf *buf,
-		 uint32_t sync)
+		 uint64_t sync)
 {
 	port->vtab = &iproto_port_vtab;
 	port->buf = buf;
@@ -80,11 +80,10 @@ iproto_port_init(struct iproto_port *port, struct obuf *buf,
 
 /** Stack a reply to 'ping' packet. */
 void
-iproto_reply_ping(struct obuf *out, uint32_t sync);
+iproto_reply_ping(struct obuf *out, uint64_t sync);
 
 /** Send an error packet back. */
 void
-iproto_reply_error(struct obuf *out, const ClientError *e,
-		   uint32_t sync);
+iproto_reply_error(struct obuf *out, const ClientError *e, uint64_t sync);
 
 #endif /* TARANTOOL_IPROTO_PORT_H_INCLUDED */
diff --git a/src/log_io.cc b/src/log_io.cc
index e7e699876b2d07582f0730e7e40fc088d16bad01..7fff98fd78e82c0c5f10068d48febadf91600957 100644
--- a/src/log_io.cc
+++ b/src/log_io.cc
@@ -35,37 +35,22 @@
 #include "fio.h"
 #include "tarantool_eio.h"
 #include "fiob.h"
+#include "msgpuck/msgpuck.h"
+#include "iproto_constants.h"
 
 const uint32_t xlog_format = 12;
-const log_magic_t row_marker = 0xba0babed;
-const log_magic_t eof_marker = 0x10adab1e;
+
+/*
+ * marker is MsgPack fixext2
+ * +--------+--------+--------+--------+
+ * |  0xd5  |  type  |       data      |
+ * +--------+--------+--------+--------+
+ */
+const log_magic_t row_marker = mp_bswap_u32(0xd5ba0bab); /* host byte order */
+const log_magic_t eof_marker = mp_bswap_u32(0xd510aded); /* host byte order */
 const char inprogress_suffix[] = ".inprogress";
 const char v12[] = "0.12\n";
 
-void
-log_row_sign(struct log_row *header)
-{
-	header->data_crc32c = crc32_calc(0, header->data, header->len);
-	header->header_crc32c = crc32_calc(0, header->header, sizeof(*header) -
-					   offsetof(struct log_row, header));
-}
-
-void
-log_row_fill(struct log_row *row, int64_t lsn, uint64_t cookie,
-	     const char *metadata, size_t metadata_len, const char *data,
-	     size_t data_len)
-{
-	row->marker = row_marker;
-	row->tag  = WAL; /* unused. */
-	row->cookie = cookie;
-	row->lsn = lsn;
-	row->tm = ev_now(loop());
-	row->len = metadata_len + data_len;
-
-	memcpy(row->data, metadata, metadata_len);
-	memcpy(row->data + metadata_len, data, data_len);
-}
-
 struct log_dir snap_dir = {
 	/* .panic_if_error = */ false,
 	/* .sync_is_async = */ false,
@@ -222,39 +207,97 @@ format_filename(struct log_dir *dir, int64_t lsn, enum log_suffix suffix)
 
 /* {{{ struct log_io_cursor */
 
-static struct log_row ROW_EOF;
-
-static const struct log_row *
-row_reader(FILE *f)
+static int
+row_reader(FILE *f, struct iproto_packet *packet)
 {
-	struct log_row m;
-
-	uint32_t header_crc, data_crc;
+	const char *data;
 
-	if (fread(&m.header_crc32c, sizeof(m) - sizeof(log_magic_t), 1, f) != 1)
-		return &ROW_EOF;
+	/* Read fixed header */
+	char fixheader[XLOG_FIXHEADER_SIZE - sizeof(log_magic_t)];
+	if (fread(fixheader, sizeof(fixheader), 1, f) != 1) {
+		if (feof(f))
+			return 1;
+error:
+		tnt_raise(ClientError, ER_INVALID_MSGPACK,
+			  "invalid fixed header");
+	}
 
-	header_crc = crc32_calc(0, m.header, sizeof(struct log_row) -
-				offsetof(struct log_row, header));
+	/* Decode len, previous crc32 and row crc32 */
+	data = fixheader;
+	if (mp_check(&data, data + sizeof(fixheader)) != 0)
+		goto error;
+	data = fixheader;
 
-	if (m.header_crc32c != header_crc) {
-		say_error("header crc32c mismatch");
-		return NULL;
+	/* Read length */
+	if (mp_typeof(*data) != MP_UINT)
+		goto error;
+	uint32_t len = mp_decode_uint(&data);
+	if (len > IPROTO_BODY_LEN_MAX) {
+		tnt_raise(ClientError, ER_INVALID_MSGPACK,
+			  "received packet is too big");
 	}
-	char *row = (char *) region_alloc(&fiber()->gc, sizeof(m) + m.len);
-	memcpy(row, &m, sizeof(m));
 
-	if (fread(row + sizeof(m), m.len, 1, f) != 1)
-		return &ROW_EOF;
+	/* Read previous crc32 */
+	if (mp_typeof(*data) != MP_UINT)
+		goto error;
+
+	/* Read current crc32 */
+	uint32_t crc32p = mp_decode_uint(&data);
+	if (mp_typeof(*data) != MP_UINT)
+		goto error;
+	uint32_t crc32c = mp_decode_uint(&data);
+	assert(data <= fixheader + sizeof(fixheader));
+	(void) crc32p;
+
+	/* Allocate memory for body */
+	char *bodybuf = (char *) region_alloc(&fiber()->gc, len);
+
+	/* Read header and body */
+	if (fread(bodybuf, len, 1, f) != 1)
+		return 1;
+
+	/* Validate checksum */
+	if (crc32_calc(0, bodybuf, len) != crc32c)
+		tnt_raise(ClientError, ER_INVALID_MSGPACK, "invalid crc32");
+
+	data = bodybuf;
+	iproto_packet_decode(packet, &data, bodybuf + len);
+
+	return 0;
+}
 
-	data_crc = crc32_calc(0, row + sizeof(m), m.len);
-	if (m.data_crc32c != data_crc) {
-		say_error("data crc32c mismatch");
-		return NULL;
+int
+xlog_encode_row(const struct iproto_packet *packet, struct iovec *iov,
+		char fixheader[XLOG_FIXHEADER_SIZE])
+{
+	int iovcnt = iproto_packet_encode(packet, iov + 1) + 1;
+	uint32_t len = 0;
+	uint32_t crc32p = 0;
+	uint32_t crc32c = 0;
+	for (int i = 1; i < iovcnt; i++) {
+		crc32c = crc32_calc(crc32c, (const char *) iov[i].iov_base,
+				    iov[i].iov_len);
+		len += iov[i].iov_len;
 	}
 
-	say_debug("read row v11 success lsn:%lld", (long long) m.lsn);
-	return (const struct log_row *) row;
+	char *data = fixheader;
+	*(log_magic_t *) data = row_marker;
+	data += sizeof(row_marker);
+	data = mp_encode_uint(data, len);
+	/* Encode crc32 for previous row */
+	data = mp_encode_uint(data, crc32p);
+	/* Encode crc32 for current row */
+	data = mp_encode_uint(data, crc32c);
+	/* Encode padding */
+	ssize_t padding = XLOG_FIXHEADER_SIZE - (data - fixheader);
+	if (padding > 0)
+		data = mp_encode_strl(data, padding - 1) + padding - 1;
+	assert(data == fixheader + XLOG_FIXHEADER_SIZE);
+	iov[0].iov_base = fixheader;
+	iov[0].iov_len = XLOG_FIXHEADER_SIZE;
+
+	assert(iovcnt <= XLOG_ROW_IOVMAX);
+	return iovcnt;
 }
 
 void
@@ -288,11 +331,10 @@ log_io_cursor_close(struct log_io_cursor *i)
  * @param i	iterator object, encapsulating log specifics.
  *
  */
-const struct log_row *
-log_io_cursor_next(struct log_io_cursor *i)
+int
+log_io_cursor_next(struct log_io_cursor *i, struct iproto_packet *packet)
 {
 	struct log_io *l = i->log;
-	const struct log_row *row;
 	log_magic_t magic;
 	off_t marker_offset = 0;
 
@@ -331,11 +373,10 @@ log_io_cursor_next(struct log_io_cursor *i)
 			(uintmax_t)i->good_offset);
 	say_debug("magic found at 0x%08jx", (uintmax_t)marker_offset);
 
-	row = row_reader(l->f);
-	if (row == &ROW_EOF)
-		goto eof;
-
-	if (row == NULL) {
+	try {
+		if (row_reader(l->f, packet) != 0)
+			goto eof;
+	} catch (Exception *e) {
 		if (l->dir->panic_if_error)
 			panic("failed to read row");
 		say_warn("failed to read row");
@@ -348,7 +389,7 @@ log_io_cursor_next(struct log_io_cursor *i)
 	if (i->row_count % 100000 == 0)
 		say_info("%.1fM rows processed", i->row_count / 1000000.);
 
-	return row;
+	return 0;
 eof:
 	/*
 	 * The only two cases of fully read file:
@@ -380,7 +421,7 @@ log_io_cursor_next(struct log_io_cursor *i)
 		}
 	}
 	/* No more rows. */
-	return NULL;
+	return 1;
 }
 
 /* }}} */
diff --git a/src/log_io.h b/src/log_io.h
index 0ecc56669a49329d79af1915990e245fc34f3f0f..764d767091f11a60bd7531788748c9e2610b54b6 100644
--- a/src/log_io.h
+++ b/src/log_io.h
@@ -31,8 +31,10 @@
 #include <stdio.h>
 #include <limits.h>
 #include <stdbool.h>
+#include <sys/uio.h>
 #include "trivia/util.h"
 #include "tarantool_ev.h"
+#include "iproto_constants.h"
 
 extern const uint32_t xlog_format;
 
@@ -111,40 +113,15 @@ log_io_cursor_open(struct log_io_cursor *i, struct log_io *l);
 void
 log_io_cursor_close(struct log_io_cursor *i);
 
-const struct log_row *
-log_io_cursor_next(struct log_io_cursor *i);
+int
+log_io_cursor_next(struct log_io_cursor *i, struct iproto_packet *packet);
+int
+xlog_encode_row(const struct iproto_packet *packet, struct iovec *iov,
+		char fixheader[XLOG_FIXHEADER_SIZE]);
+enum { XLOG_ROW_IOVMAX = IPROTO_PACKET_IOVMAX + 1 };
 
 typedef uint32_t log_magic_t;
 
-struct log_row {
-	log_magic_t marker;
-	uint32_t header_crc32c; /* calculated for the header block */
-	/* {{{ header block */
-	char header[0]; /* start of the header */
-	int64_t lsn;
-	double tm;
-	uint32_t len;
-	uint16_t tag;
-	uint64_t cookie;
-	uint32_t data_crc32c; /* calculated for data */
-	/* }}} */
-	char data[0];   /* start of the data */
-} __attribute__((packed));
-
-void
-log_row_sign(struct log_row *row);
-
-void
-log_row_fill(struct log_row *row, int64_t lsn, uint64_t cookie,
-	     const char *metadata, size_t metadata_len, const char *data,
-	     size_t data_len);
-
-static inline size_t
-log_row_size(const struct log_row *row)
-{
-	return sizeof(struct log_row) + row->len;
-}
-
 int
 inprogress_log_unlink(char *filename);
 int
diff --git a/src/recovery.cc b/src/recovery.cc
index 31c0b01f87a7e16f142f059e42de691c10561435..0efbc2e7308c7655674d3510d52210c66f40a39b 100644
--- a/src/recovery.cc
+++ b/src/recovery.cc
@@ -40,6 +40,9 @@
 
 #include "replica.h"
 #include "fiber.h"
+#include "msgpuck/msgpuck.h"
+#include "iproto_constants.h"
+#include "crc32.h"
 
 /*
  * Recovery subsystem
@@ -207,7 +210,7 @@ recovery_stop_local(struct recovery_state *r);
 void
 recovery_init(const char *snap_dirname, const char *wal_dirname,
 	      row_handler row_handler, void *row_handler_param,
-	      int rows_per_wal)
+	      snapshot_handler snapshot_handler, int rows_per_wal)
 {
 	assert(recovery_state == NULL);
 	recovery_state = (struct recovery_state *) calloc(1, sizeof(struct recovery_state));
@@ -219,6 +222,8 @@ recovery_init(const char *snap_dirname, const char *wal_dirname,
 	r->row_handler = row_handler;
 	r->row_handler_param = row_handler_param;
 
+	r->snapshot_handler = snapshot_handler;
+
 	r->snap_dir = &snap_dir;
 	r->snap_dir->dirname = strdup(snap_dirname);
 	r->wal_dir = &wal_dir;
@@ -291,7 +296,7 @@ recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_e
  *  @return panics on error
  *  Errors are logged to the log file.
  */
-static void
+void
 init_storage_on_master(struct log_dir *dir)
 {
 	const char *filename = format_filename(dir, 1 /* lsn */, NONE);
@@ -307,47 +312,13 @@ init_storage_on_master(struct log_dir *dir)
 			       filename);
 	}
 	close(fd);
-}
-
-/** Download the latest snapshot from master. */
-static void
-init_storage_on_replica(struct log_dir *dir, const char *replication_source)
-{
-	say_info("downloading snapshot from master %s...",
-		 replication_source);
-
-	int master = replica_bootstrap(replication_source);
-	FDGuard guard_master(master);
-
-	struct {
-		uint64_t lsn;
-		uint64_t file_size;
-	} response;
-	sio_readn(master, &response, sizeof(response));
-
-	const char *filename = format_filename(dir, response.lsn, NONE);
-	say_info("saving snapshot `%s'", filename);
-	int fd = open(filename, O_WRONLY|O_CREAT|O_EXCL, dir->mode);
-	if (fd == -1) {
-		panic_syserror("failed to open snapshot file `%s' for "
-			       "writing", filename);
-	}
-	FDGuard guard_fd(fd);
-
-	sio_recvfile(master, fd, NULL, response.file_size);
-}
-
-/** Create the initial snapshot file in the snap directory. */
-void
-init_storage(struct log_dir *dir, const char *replication_source)
-{
-	if (replication_source)
-		init_storage_on_replica(dir, replication_source);
-	else
-		init_storage_on_master(dir);
 	say_info("done");
 }
 
+/**
+ * Read a snapshot and call row_handler for every snapshot row.
+ * Panic in case of error.
+ */
 /**
  * Read a snapshot and call row_handler for every snapshot row.
  * Panic in case of error.
@@ -361,12 +332,21 @@ recover_snap(struct recovery_state *r, const char *replication_source)
 
 	struct log_io *snap;
 	int64_t lsn;
+	int rc = 0;
 
 	lsn = greatest_lsn(r->snap_dir);
 	if (lsn == 0 && greatest_lsn(r->wal_dir) == 0) {
 		say_info("found an empty data directory, initializing...");
-		init_storage(r->snap_dir, replication_source);
-		lsn = greatest_lsn(r->snap_dir);
+		if (replication_source) {
+			/* play rows and save snapshot */
+			replica_bootstrap(r, replication_source);
+			snapshot_save(r);
+			assert(r->lsn == greatest_lsn(r->snap_dir));
+			return;
+		} else {
+			init_storage_on_master(r->snap_dir);
+			lsn = greatest_lsn(r->snap_dir);
+		}
 	}
 
 	if (lsn <= 0) {
@@ -383,18 +363,19 @@ recover_snap(struct recovery_state *r, const char *replication_source)
 
 	log_io_cursor_open(&i, snap);
 
-	const struct log_row *row;
-	while ((row = log_io_cursor_next(&i))) {
-		if (r->row_handler(r->row_handler_param, row) < 0) {
+	struct iproto_packet packet;
+	while (log_io_cursor_next(&i, &packet) == 0) {
+		if (r->row_handler(r->row_handler_param, &packet) < 0) {
 			say_error("can't apply row");
 			if (snap->dir->panic_if_error)
 				break;
+			rc = 1;
 		}
 	}
 	log_io_cursor_close(&i);
 	log_io_close(&snap);
 
-	if (row == NULL) {
+	if (rc == 0) {
 		r->lsn = r->confirmed_lsn = lsn;
 		say_info("snapshot recovered, confirmed lsn: %"
 			 PRIi64, r->confirmed_lsn);
@@ -415,7 +396,7 @@ recover_snap(struct recovery_state *r, const char *replication_source)
  * @retval 0 EOF
  * @retval 1 ok, maybe read something
  */
-static int
+int
 recover_wal(struct recovery_state *r, struct log_io *l)
 {
 	int res = -1;
@@ -423,9 +404,9 @@ recover_wal(struct recovery_state *r, struct log_io *l)
 
 	log_io_cursor_open(&i, l);
 
-	const struct log_row *row;
-	while ((row = log_io_cursor_next(&i))) {
-		if (row->lsn <= r->confirmed_lsn) {
+	struct iproto_packet packet;
+	while (log_io_cursor_next(&i, &packet) == 0) {
+		if (packet.lsn <= r->confirmed_lsn) {
 			say_debug("skipping too young row");
 			continue;
 		}
@@ -433,12 +414,12 @@ recover_wal(struct recovery_state *r, struct log_io *l)
 		 * After handler(row) returned, row may be
 		 * modified, do not use it.
 		 */
-		if (r->row_handler(r->row_handler_param, row) < 0) {
+		if (r->row_handler(r->row_handler_param, &packet) < 0) {
 			say_error("can't apply row");
 			if (l->dir->panic_if_error)
 				goto end;
 		}
-		set_lsn(r, row->lsn);
+		set_lsn(r, packet.lsn);
 	}
 	res = i.eof_read ? LOG_EOF : 1;
 end:
@@ -773,7 +754,8 @@ struct wal_write_request {
 	/* Auxiliary. */
 	int res;
 	struct fiber *fiber;
-	struct log_row row;
+	struct iproto_packet *packet;
+	char wal_fixheader[XLOG_FIXHEADER_SIZE];
 };
 
 /* Context of the WAL writer thread. */
@@ -1102,10 +1084,11 @@ wal_fill_batch(struct log_io *wal, struct fio_batch *batch, int rows_per_wal,
 	/* Post-condition of successful wal_opt_rotate(). */
 	assert(max_rows > 0);
 	fio_batch_start(batch, max_rows);
-	while (req != NULL && ! fio_batch_is_full(batch)) {
-		struct log_row *row = &req->row;
-		log_row_sign(row);
-		fio_batch_add(batch, row, log_row_size(row));
+
+	struct iovec iov[XLOG_ROW_IOVMAX];
+	while (req != NULL && !fio_batch_has_space(batch, nelem(iov))) {
+		int iovcnt = xlog_encode_row(req->packet, iov, req->wal_fixheader);
+		fio_batch_add(batch, iov, iovcnt);
 		req = STAILQ_NEXT(req, wal_fifo_entry);
 	}
 	return req;
@@ -1137,7 +1120,7 @@ wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer,
 
 	while (req) {
 		if (wal_opt_rotate(wal, r->rows_per_wal, r->wal_dir,
-				   req->row.lsn) != 0)
+				   req->packet->lsn) != 0)
 			break;
 		struct wal_write_request *batch_end;
 		batch_end = wal_fill_batch(*wal, batch, r->rows_per_wal, req);
@@ -1194,23 +1177,22 @@ wal_writer_thread(void *worker_args)
  * to be written to disk and wait until this task is completed.
  */
 int
-wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie,
-	  uint16_t op, const char *row, uint32_t row_len)
+wal_write(struct recovery_state *r, struct iproto_packet *packet)
 {
 	assert(r->wal_mode != WAL_NONE);
-	say_debug("wal_write lsn=%" PRIi64, lsn);
+	say_debug("wal_write lsn=%" PRIi64, packet->lsn);
 	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
 
 	struct wal_writer *writer = r->writer;
 
 	struct wal_write_request *req = (struct wal_write_request *)
-		region_alloc(&fiber()->gc, sizeof(struct wal_write_request) +
-			     sizeof(op) + row_len);
+		region_alloc(&fiber()->gc, sizeof(struct wal_write_request));
 
 	req->fiber = fiber();
 	req->res = -1;
-	log_row_fill(&req->row, lsn, cookie, (const char *) &op,
-		     sizeof(op), row, row_len);
+	req->packet = packet;
+	packet->tm = ev_now(loop());
+	packet->sync = 0;
 
 	(void) tt_pthread_mutex_lock(&writer->mutex);
 
@@ -1232,9 +1214,7 @@ wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie,
 /* {{{ box.snapshot() */
 
 void
-snapshot_write_row(struct log_io *l,
-		   const char *metadata, size_t metadata_len,
-		   const char *data, size_t data_len)
+snapshot_write_row(struct log_io *l, struct iproto_packet *packet)
 {
 	static int rows;
 	static uint64_t bytes;
@@ -1242,24 +1222,26 @@ snapshot_write_row(struct log_io *l,
 	static ev_tstamp last = 0;
 	ev_loop *loop = loop();
 
-	struct log_row *row = (struct log_row *) region_alloc(&fiber()->gc,
-		sizeof(*row) + data_len + metadata_len);
-
-	log_row_fill(row, ++rows, snapshot_cookie, metadata,
-		     metadata_len, data, data_len);
-	log_row_sign(row);
-
-	if (rows % 100000 == 0)
-		say_crit("%.1fM rows written", rows / 1000000.);
+	packet->tm = last;
+	packet->lsn = ++rows;
+	packet->sync = 0; /* don't write sync to wal */
 
-	size_t written = fwrite(row, 1, log_row_size(row), l->f);
+	char fixheader[XLOG_FIXHEADER_SIZE];
+	struct iovec iov[XLOG_ROW_IOVMAX];
+	int iovcnt = xlog_encode_row(packet, iov, fixheader);
 
-	if (written != log_row_size(row)) {
-		say_error("Can't write row (%zu bytes)", log_row_size(row));
-		panic_syserror("snapshot_write_row");
+	/* TODO: use writev here */
+	for (int i = 0; i < iovcnt; i++) {
+		if (fwrite(iov[i].iov_base, iov[i].iov_len, 1, l->f) != 1) {
+			say_error("Can't write row (%zu bytes)",
+				  iov[i].iov_len);
+			panic_syserror("snapshot_write_row");
+		}
+		bytes += iov[i].iov_len;
 	}
 
-	bytes += written;
+	if (rows % 100000 == 0)
+		say_crit("%.1fM rows written", rows / 1000000.);
 
 	region_free_after(&fiber()->gc, 128 * 1024);
 
@@ -1302,8 +1284,9 @@ snapshot_write_row(struct log_io *l,
 }
 
 void
-snapshot_save(struct recovery_state *r, void (*f) (struct log_io *))
+snapshot_save(struct recovery_state *r)
 {
+	assert(r->snapshot_handler != NULL);
 	struct log_io *snap;
 	snap = log_io_open_for_write(r->snap_dir, r->confirmed_lsn,
 				     INPROGRESS);
@@ -1317,8 +1300,8 @@ snapshot_save(struct recovery_state *r, void (*f) (struct log_io *))
 	say_info("saving snapshot `%s'",
 		 format_filename(r->snap_dir, r->confirmed_lsn,
 				 NONE));
-	if (f)
-		f(snap);
+
+	r->snapshot_handler(snap);
 
 	log_io_close(&snap);
 
diff --git a/src/recovery.h b/src/recovery.h
index 03b31cdfad10390481ce5538718c8f24637fe771..2d1eac753a1b24bfd6159328b5dec0a7f611841a 100644
--- a/src/recovery.h
+++ b/src/recovery.h
@@ -32,6 +32,7 @@
 
 #include "trivia/util.h"
 #include "tarantool_ev.h"
+#include "log_io.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -40,7 +41,8 @@ extern "C" {
 struct fiber;
 struct tbuf;
 
-typedef int (row_handler)(void *, const struct log_row *row);
+typedef int (row_handler)(void *, struct iproto_packet *packet);
+typedef void (snapshot_handler)(struct log_io *);
 
 /** A "condition variable" that allows fibers to wait when a given
  * LSN makes it to disk.
@@ -88,6 +90,7 @@ struct recovery_state {
 	 */
 	row_handler *row_handler;
 	void *row_handler_param;
+	snapshot_handler *snapshot_handler;
 	uint64_t snap_io_rate_limit;
 	int rows_per_wal;
 	double wal_fsync_delay;
@@ -101,18 +104,20 @@ extern struct recovery_state *recovery_state;
 
 void recovery_init(const char *snap_dirname, const char *xlog_dirname,
 		   row_handler row_handler, void *row_handler_param,
-		   int rows_per_wal);
+		   snapshot_handler snapshot_handler, int rows_per_wal);
 void recovery_update_mode(struct recovery_state *r,
 			  const char *wal_mode, double fsync_delay);
 void recovery_update_io_rate_limit(struct recovery_state *r,
 				   double new_limit);
 void recovery_free();
-void recover_snap(struct recovery_state *, const char *replication_source);
+void recover_snap(struct recovery_state *r, const char *replication_source);
 void recover_existing_wals(struct recovery_state *);
 void recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay);
 void recovery_finalize(struct recovery_state *r);
-int wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie,
-	      uint16_t op, const char *data, uint32_t len);
+
+int
+recover_wal(struct recovery_state *r, struct log_io *l); /* for replication */
+int wal_write(struct recovery_state *r, struct iproto_packet *packet);
 
 void recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_error);
 
@@ -124,13 +129,12 @@ void recovery_wait_lsn(struct recovery_state *r, int64_t lsn);
 
 struct fio_batch;
 
-void snapshot_write_row(struct log_io *i,
-			const char *metadata, size_t metadata_size,
-			const char *data, size_t data_size);
-void snapshot_save(struct recovery_state *r, void (*loop) (struct log_io *));
+void
+snapshot_write_row(struct log_io *l, struct iproto_packet *packet);
+void snapshot_save(struct recovery_state *r);
 
 void
-init_storage(struct log_dir *dir, const char *replication_source);
+init_storage_on_master(struct log_dir *dir);
 
 #if defined(__cplusplus)
 } /* extern "C" */
diff --git a/src/replica.cc b/src/replica.cc
index 18ab56e1a0088de5745f5820646fd20f44da88fc..66b20dfee5fc1e6d6544a69155954075674b6ace 100644
--- a/src/replica.cc
+++ b/src/replica.cc
@@ -39,55 +39,86 @@
 #include "coio_buf.h"
 #include "recovery.h"
 #include "tarantool.h"
+#include "iproto.h"
 #include "iproto_constants.h"
 #include "msgpuck/msgpuck.h"
 #include "replica.h"
 
 static void
-remote_apply_row(struct recovery_state *r, const struct log_row *row);
+remote_apply_row(struct recovery_state *r, struct iproto_packet *packet);
 
-static const struct log_row *
-remote_read_row(struct ev_io *coio, struct iobuf *iobuf)
+static void
+remote_remote_read_row_fd(struct ev_io *coio, struct iobuf *iobuf,
+		 struct iproto_packet *packet)
 {
 	struct ibuf *in = &iobuf->in;
-	ssize_t to_read = sizeof(struct log_row) - ibuf_size(in);
 
-	if (to_read > 0) {
-		ibuf_reserve(in, cfg_readahead);
-		coio_breadn(coio, in, to_read);
+	/* Read fixed header */
+	if (ibuf_size(in) < IPROTO_FIXHEADER_SIZE)
+		coio_breadn(coio, in, IPROTO_FIXHEADER_SIZE - ibuf_size(in));
+
+	/* Read length */
+	if (mp_typeof(*in->pos) != MP_UINT) {
+		tnt_raise(ClientError, ER_INVALID_MSGPACK,
+			  "invalid fixed header");
 	}
 
-	ssize_t request_len = ((const struct log_row *) in->pos)->len +
-			sizeof(struct log_row);
-	to_read = request_len - ibuf_size(in);
+	const char *data = in->pos;
+	uint32_t len = mp_decode_uint(&data);
+	if (len > IPROTO_BODY_LEN_MAX) {
+		tnt_raise(ClientError, ER_INVALID_MSGPACK,
+			  "received packet is too big");
+	}
+	in->pos += IPROTO_FIXHEADER_SIZE;
 
+	/* Read header and body */
+	ssize_t to_read = len - ibuf_size(in);
 	if (to_read > 0)
 		coio_breadn(coio, in, to_read);
 
-	const struct log_row *row = (const struct log_row *) in->pos;
-	in->pos += request_len;
-	return row;
+	iproto_packet_decode(packet, (const char **) &in->pos, in->pos + len);
 }
 
-struct iproto_subscribe_request {
-	uint8_t v_len;                         /* length */
-	uint8_t m_header;                       /* MP_MAP */
-	uint8_t k_code;                         /* IPROTO_CODE */
-	uint8_t v_code;                        /* response status */
-	uint8_t m_body;                         /* MP_MAP */
-	uint8_t k_data;                         /* IPROTO_OFFSET */
-	uint8_t m_data;                         /* MP_UINT64 */
-	uint64_t lsn;                           /* lsn */
-} __attribute__((packed));
-
-static const struct iproto_subscribe_request iproto_subscribe_request = {
-	sizeof(struct iproto_subscribe_request) - 1,
-	0x81, IPROTO_CODE, IPROTO_SUBSCRIBE,
-	0x81, IPROTO_OFFSET, 0xcf, 0
-};
-
-int
-replica_bootstrap(const char *replication_source)
+/* Blocked I/O  */
+static void
+remote_read_row_fd(int sock, struct iproto_packet *packet)
+{
+	const char *data;
+
+	/* Read fixed header */
+	char fixheader[IPROTO_FIXHEADER_SIZE];
+	if (sio_read(sock, fixheader, sizeof(fixheader)) != sizeof(fixheader)) {
+error:
+		tnt_raise(ClientError, ER_INVALID_MSGPACK,
+			  "invalid fixed header");
+	}
+	data = fixheader;
+	if (mp_check(&data, data + sizeof(fixheader)) != 0)
+		goto error;
+	data = fixheader;
+
+	/* Read length */
+	if (mp_typeof(*data) != MP_UINT)
+		goto error;
+	uint32_t len = mp_decode_uint(&data);
+	if (len > IPROTO_BODY_LEN_MAX) {
+		tnt_raise(ClientError, ER_INVALID_MSGPACK,
+			  "received packet is too big");
+	}
+
+	/* Read header and body */
+	char *bodybuf = (char *) region_alloc(&fiber()->gc, len);
+	if (sio_read(sock, bodybuf, len) != len) {
+		tnt_raise(ClientError, ER_INVALID_MSGPACK,
+			  "invalid row - can't read");
+	}
+
+	data = bodybuf;
+	iproto_packet_decode(packet, &data, data + len);
+}
+
+void
+replica_bootstrap(struct recovery_state *r, const char *replication_source)
 {
 	char ip_addr[32];
 	char greeting[IPROTO_GREETING_SIZE];
@@ -109,13 +140,36 @@ replica_bootstrap(const char *replication_source)
 
 	int master = sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
 	FDGuard guard(master);
+
+	assert(r->confirmed_lsn == 0 && r->lsn == 0);
+	uint64_t sync = rand();
+
+	/* Send JOIN request */
+	struct iproto_subscribe subscribe = iproto_subscribe_stub;
+	subscribe.sync = mp_bswap_u64(sync);
 	sio_connect(master, &addr, sizeof(addr));
 	sio_readn(master, greeting, sizeof(greeting));
-	sio_writen(master, &iproto_subscribe_request,
-		  sizeof(iproto_subscribe_request));
+	sio_write(master, &subscribe, sizeof(subscribe));
+
+	while (true) {
+		struct iproto_packet packet;
+
+		remote_read_row_fd(master, &packet);
+		if (packet.sync != sync)
+			tnt_raise(IllegalParams, "unexpected packet");
+
+		/* Recv JOIN response (= end of stream) */
+		if (packet.code == IPROTO_SUBSCRIBE) {
+			if (packet.bodycnt != 0)
+				tnt_raise(IllegalParams, "subscribe response body");
+			set_lsn(r, packet.lsn);
+			say_info("done");
+			break;
+		}
 
-	guard.fd = -1;
-	return master;
+		remote_apply_row(r, &packet);
+	}
+	/* master socket closed by guard */
 }
 
 static void
@@ -129,7 +183,8 @@ remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr,
 	coio_connect(coio, remote_addr);
 	coio_readn(coio, greeting, sizeof(greeting));
 
-	struct iproto_subscribe_request request = iproto_subscribe_request;
+	/* Send JOIN request */
+	struct iproto_subscribe request = iproto_subscribe_stub;
 	request.lsn = mp_bswap_u64(initial_lsn);
 	coio_write(coio, &request, sizeof(request));
 
@@ -165,15 +220,16 @@ pull_from_remote(va_list ap)
 				      "connected");
 			}
 			err = "can't read row";
-			const struct log_row *row = remote_read_row(&coio, iobuf);
+			struct iproto_packet packet;
+			remote_remote_read_row_fd(&coio, iobuf, &packet);
 			fiber_setcancellable(false);
 			err = NULL;
 
-			r->remote->recovery_lag = ev_now(loop) - row->tm;
+			r->remote->recovery_lag = ev_now(loop) - packet.tm;
 			r->remote->recovery_last_update_tstamp =
 				ev_now(loop);
 
-			remote_apply_row(r, row);
+			remote_apply_row(r, &packet);
 
 			iobuf_gc(iobuf);
 			fiber_gc();
@@ -213,14 +269,12 @@ pull_from_remote(va_list ap)
 }
 
 static void
-remote_apply_row(struct recovery_state *r, const struct log_row *row)
+remote_apply_row(struct recovery_state *r, struct iproto_packet *packet)
 {
-	assert(row->tag == WAL);
-
-	if (r->row_handler(r->row_handler_param, row) < 0)
+	if (r->row_handler(r->row_handler_param, packet) < 0)
 		panic("replication failure: can't apply row");
 
-	set_lsn(r, row->lsn);
+	set_lsn(r, packet->lsn);
 }
 
 void
diff --git a/src/replica.h b/src/replica.h
index 0820f7deddeda2525fa66bc9321621f978c89ff2..553a4dde01a55a1ff4a4f238fbfe4232685597bc 100644
--- a/src/replica.h
+++ b/src/replica.h
@@ -48,8 +48,8 @@ struct remote {
  * @return A connected socket, ready too receive
  * data.
  */
-int
-replica_bootstrap(const char *replication_source);
+void
+replica_bootstrap(struct recovery_state *r, const char *replication_source);
 
 void
 recovery_follow_remote(struct recovery_state *r, const char *addr);
diff --git a/src/replication.cc b/src/replication.cc
index 0a2eb143e10317a48ad37603f1dbc8e33c5d329a..eca1dd64528ea3e635451015ec2f57bb187d9b6e 100644
--- a/src/replication.cc
+++ b/src/replication.cc
@@ -49,6 +49,8 @@ extern "C" {
 #include "recovery.h"
 #include "log_io.h"
 #include "evio.h"
+#include "iproto_constants.h"
+#include "msgpuck/msgpuck.h"
 
 /** Replication topology
  * ----------------------
@@ -86,6 +88,7 @@ struct replica {
 	int sock;
 	/** Initial lsn. */
 	int64_t lsn;
+	uint64_t sync;
 } replica;
 
 /** Send a file descriptor to replication relay spawner.
@@ -194,11 +197,12 @@ struct subscribe_request {
 	struct ev_io io;
 	int fd;
 	int64_t lsn;
+	uint64_t sync;
 };
 
 /** Replication acceptor fiber handler. */
 void
-subscribe(int fd, int64_t lsn)
+subscribe(int fd, int64_t lsn, uint64_t sync)
 {
 	struct subscribe_request *request = (struct subscribe_request *)
 		malloc(sizeof(struct subscribe_request));
@@ -209,6 +213,7 @@ subscribe(int fd, int64_t lsn)
 	request->fd = fd;
 	request->io.data = request;
 	request->lsn = lsn;
+	request->sync = sync;
 	ev_io_init(&request->io, replication_send_socket,
 		   master_to_spawner_socket, EV_WRITE);
 	ev_io_start(loop(), &request->io);
@@ -227,7 +232,7 @@ replication_send_socket(ev_loop *loop, ev_io *watcher, int /* events */)
 	struct cmsghdr *control_message = NULL;
 
 	iov.iov_base = &request->lsn;
-	iov.iov_len = sizeof(request->lsn);
+	iov.iov_len = sizeof(request->lsn) + sizeof(request->sync);
 
 	memset(&msg, 0, sizeof(msg));
 
@@ -342,7 +347,7 @@ spawner_main_loop()
 	char control_buf[CMSG_SPACE(sizeof(int))];
 
 	iov.iov_base = &replica.lsn;
-	iov.iov_len = sizeof(replica.lsn);
+	iov.iov_len = sizeof(replica.lsn) + sizeof(replica.sync);
 
 	msg.msg_name = NULL;
 	msg.msg_namelen = 0;
@@ -537,56 +542,102 @@ replication_relay_recv(ev_loop * /* loop */, struct ev_io *w, int __attribute__(
 	exit(EXIT_FAILURE);
 }
 
+/* Only for blocked I/O */
+static inline ssize_t
+sio_writev_all(int fd, struct iovec *iov, int iovcnt)
+{
+	ssize_t bytes_total = 0;
+	struct iovec *iovend = iov + iovcnt;
+	while(1) {
+		ssize_t bytes_written = sio_writev(fd, iov, iovend - iov);
+		bytes_total += bytes_written;
+		while (bytes_written >= iov->iov_len)
+			bytes_written -= (iov++)->iov_len;
+		if (iov == iovend)
+			break;
+		iov->iov_base = (char *) iov->iov_base + bytes_written;
+		iov->iov_len -= bytes_written;
+	}
+
+	return bytes_total;
+}
+
+
+enum { IPROTO_ROW_IOVMAX = IPROTO_PACKET_IOVMAX + 1 };
+
+static int
+iproto_encode_row(const struct iproto_packet *packet, struct iovec *iov,
+		  char fixheader[IPROTO_FIXHEADER_SIZE])
+{
+	int iovcnt = iproto_packet_encode(packet, iov + 1) + 1;
+	uint32_t len = 0;
+	for (int i = 1; i < iovcnt; i++)
+		len += iov[i].iov_len;
+
+	/* Encode length */
+	char *data = fixheader;
+	data = mp_encode_uint(data, len);
+	/* Encode padding */
+	ssize_t padding = IPROTO_FIXHEADER_SIZE - (data - fixheader);
+	if (padding > 0)
+		data = mp_encode_strl(data, padding - 1) + padding - 1;
+	assert(data == fixheader + IPROTO_FIXHEADER_SIZE);
+	iov[0].iov_base = fixheader;
+	iov[0].iov_len = IPROTO_FIXHEADER_SIZE;
+
+	assert(iovcnt <= IPROTO_ROW_IOVMAX);
+	return iovcnt;
+}
 
 /** Send a single row to the client. */
 static int
-replication_relay_send_row(void * /* param */, const struct log_row *row)
+replication_relay_send_row(void * /* param */, struct iproto_packet *packet)
 {
-	ssize_t bytes, len = log_row_size(row);
-	while (len > 0) {
-		bytes = write(replica.sock, row, len);
-		if (bytes < 0) {
-			if (errno == EPIPE) {
-				/* socket closed on opposite site */
-				goto shutdown_handler;
-			}
-			panic_syserror("write");
-		}
-		len -= bytes;
-		row += bytes;
+	try {
+		packet->sync = replica.sync;
+		/* Encode length */
+		struct iovec iov[IPROTO_ROW_IOVMAX];
+		char fixheader[IPROTO_FIXHEADER_SIZE];
+		int iovcnt = iproto_encode_row(packet, iov, fixheader);
+		sio_writev_all(replica.sock, iov, iovcnt);
+	} catch(SocketError *e) {
+		say_info("the client has closed its replication socket, exiting");
+		exit(EXIT_SUCCESS);
 	}
 
 	return 0;
-shutdown_handler:
-	say_info("the client has closed its replication socket, exiting");
-	exit(EXIT_SUCCESS);
 }
 
 static void
-replication_relay_send_snapshot()
+replication_relay_join(struct recovery_state *r, uint64_t sync)
 {
 	FDGuard guard_replica(replica.sock);
-	struct log_dir dir = snap_dir;
-	dir.dirname = cfg.snap_dir;
-	int64_t lsn = greatest_lsn(&dir);
-	const char *filename = format_filename(&dir, lsn, NONE);
-	int snapshot = open(filename, O_RDONLY);
-	if (snapshot < 0)
-		panic_syserror("can't find/open snapshot");
-
-	FDGuard guard_snapshot(snapshot);
-
-	struct stat st;
-	if (fstat(snapshot, &st) != 0)
-		panic_syserror("fstat");
-
-	uint64_t header[2];
-	header[0] = lsn;
-	header[1] = st.st_size;
-	sio_writen(replica.sock, header, sizeof(header));
-	sio_sendfile(replica.sock, snapshot, NULL, header[1]);
 
+	int64_t lsn = greatest_lsn(r->snap_dir);
+	if (lsn <= 0)
+		panic("can't find snapshot");
+
+	struct log_io *snap = log_io_open_for_read(r->snap_dir, lsn, NONE);
+	if (snap == NULL)
+		panic("can't open snapshot");
+	say_info("sending snapshot `%s'", snap->filename);
+
+	/* Send rows */
+	int rc = recover_wal(r, snap);
+	log_io_close(&snap);
+
+	if (rc != 0)
+		panic("can't sent snapshot");
+
+	/* Send response to JOIN command = end of stream */
+	struct iproto_subscribe response = iproto_subscribe_stub;
+	response.lsn = mp_bswap_u64(lsn);
+	response.sync = mp_bswap_u64(sync);
+	sio_write(replica.sock, &response, sizeof(response));
+
+	say_info("snapshot sent, lsn: %" PRIi64, lsn);
 	exit(EXIT_SUCCESS);
+	/* replica.sock closed by guard */
 }
 
 /** The main loop of replication client service process. */
@@ -633,8 +684,6 @@ replication_relay_loop()
 		say_syserror("sigaction");
 	}
 
-	if (replica.lsn == 0)
-		replication_relay_send_snapshot(); /* exits */
 	/*
 	 * Init a read event: when replica closes its end
 	 * of the socket, we can read EOF and shutdown the
@@ -649,11 +698,16 @@ replication_relay_loop()
 	/* Initialize the recovery process */
 	recovery_init(cfg.snap_dir, cfg.wal_dir,
 		      replication_relay_send_row,
-		      NULL, INT32_MAX);
+		      NULL, NULL, INT32_MAX);
 	/*
 	 * Note that recovery starts with lsn _NEXT_ to
 	 * the confirmed one.
 	 */
+	if (replica.lsn == 0) {
+		recovery_state->lsn = recovery_state->confirmed_lsn = 0;
+		replication_relay_join(recovery_state, replica.sync); /* exits */
+	}
+
 	recovery_state->lsn = recovery_state->confirmed_lsn = replica.lsn - 1;
 	recover_existing_wals(recovery_state);
 	/* Found nothing. */
diff --git a/src/replication.h b/src/replication.h
index 40ecc01adfbcc8a8643b9cb345405cbece2e320a..b42c15c36a0d6259ed843119d4b7fbf8225a9521 100644
--- a/src/replication.h
+++ b/src/replication.h
@@ -45,7 +45,7 @@ replication_prefork();
  * @return None. On error, closes the socket.
  */
 void
-subscribe(int fd, int64_t lsn);
+subscribe(int fd, int64_t lsn, uint64_t sync);
 
 #endif // TARANTOOL_REPLICATION_H_INCLUDED
 
diff --git a/src/session.cc b/src/session.cc
index 8a787fdd72e57ff5c166ddbb926edfdd435e1d07..1ddc10a8411c96a27f9667642f03eaa81d119332 100644
--- a/src/session.cc
+++ b/src/session.cc
@@ -83,14 +83,7 @@ session_create(int fd, uint64_t cookie)
 	 * fiber sid.
 	 */
 	fiber_set_session(fiber(), session);
-	try {
-		trigger_run(&session_on_connect, NULL);
-	} catch (Exception *e) {
-		fiber_set_session(fiber(), NULL);
-		mh_i32ptr_remove(session_registry, &node, NULL);
-		mempool_free(&session_pool, session);
-		throw;
-	}
+
 	/* Set session user to guest, until it is authenticated. */
 	session_set_user(session, GUEST, GUEST);
 	return session;
diff --git a/test/box/bad_trigger.result b/test/box/bad_trigger.result
index b40377d1712a5493a6b256b87c9530d8bb9d93cf..a7421fe612757e89b68d77ff8a3286d03f4325cf 100644
--- a/test/box/bad_trigger.result
+++ b/test/box/bad_trigger.result
@@ -10,7 +10,12 @@ box.session.on_connect(f1)
 ---
 ...
 select * from t0 where k0=0
-Connection is dead: Connection reset by peer.
+---
+- error:
+    errcode: ER_PROC_LUA
+    errmsg: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
+...
+Connection is alive.
 
 box.session.on_connect(nil, f1)
 ---
diff --git a/test/box/dup_key1.xlog b/test/box/dup_key1.xlog
index 08563a9fb2cb9836509d28764624e13ae2c2d2dd..2821f7db0b489ca138f7ce3d8869f384c1c96471 100644
Binary files a/test/box/dup_key1.xlog and b/test/box/dup_key1.xlog differ
diff --git a/test/box/dup_key2.xlog b/test/box/dup_key2.xlog
index fb10436343275d0d4f7b12af4a3746fa56f0c55b..052bfac1cd3392ee0609a962720aaeebf04eb2ef 100644
Binary files a/test/box/dup_key2.xlog and b/test/box/dup_key2.xlog differ
diff --git a/test/box/unfinished.xlog b/test/box/unfinished.xlog
index e689a9c7aedf8aa1874da05932643372abb40cec..5ebfa99f87438a2dbf7f67843b2b6e22f7a88e68 100644
Binary files a/test/box/unfinished.xlog and b/test/box/unfinished.xlog differ