From d785861a406850e680bc8581f8e946035856bbd2 Mon Sep 17 00:00:00 2001
From: Dmitry Simonenko <pmwkaa@gmail.com>
Date: Tue, 7 Apr 2015 18:13:44 +0300
Subject: [PATCH] gh-698: test and related fixes

* Fix gh-698: duplicate-key error followed by segfault

* Add long_run test suite, which include test case from the
ticket for memtx and sophia engines.
Result files must be zero-diff

Test script also has been fixed in a number of ways:
  a) crud operation on sophia space always return nil
  b) sophia_space:len() is not monotonous
  c) update of primary key is not allowed

* Fix return of vclock_sum() as wal_writer() result, reuse
it for txn->signature

* Fixed incorrect tuple guard usage in execute_delete()

* Fixed sophia delete operation: return old_tuple as an
exception for SophiaIndex::replace(). Do not skip
wal_write() and correctly set txn->signature

* Put engine commit code after wal_write(), reuse proper
txn->signature. Engine->commit yet can throw an exception
on concurrent transaction conflict or error

* rollback sophia transaction on conflict

* add second test for gh-698; rename and disable zlong_run tests

* make rollback statement engine specific

* make txn_finish() engine specific; use it both by commit/rollback to gc

* refactor execute_delete()

* upgrade sophia submodule: efficient msgpack storage

* engine api: style fixes
---
 src/box/engine.cc                           |   6 +
 src/box/engine.h                            |  19 ++-
 src/box/errcode.h                           |   1 +
 src/box/lua/call.cc                         |   2 +-
 src/box/memtx_engine.cc                     |  31 +++++
 src/box/memtx_engine.h                      |   2 +
 src/box/recovery.cc                         |   6 +-
 src/box/request.cc                          |  37 ++++--
 src/box/sophia_engine.cc                    |  48 ++++++--
 src/box/sophia_engine.h                     |  13 +-
 src/box/sophia_index.cc                     | 124 ++++++++++----------
 src/box/sophia_index.h                      |   1 -
 src/box/txn.cc                              |  67 +++--------
 src/box/txn.h                               |   4 +-
 test/box/misc.result                        |   1 +
 test/long_run/box.lua                       |  21 ++++
 test/long_run/memtx_delete_insert.result    |  11 ++
 test/long_run/memtx_delete_insert.test.lua  |   5 +
 test/long_run/memtx_dru.result              |  59 ++++++++++
 test/long_run/memtx_dru.test.lua            |  21 ++++
 test/long_run/sophia_delete_insert.result   |  11 ++
 test/long_run/sophia_delete_insert.test.lua |   5 +
 test/long_run/sophia_dru.result             |  59 ++++++++++
 test/long_run/sophia_dru.test.lua           |  21 ++++
 test/long_run/suite.ini                     |   9 ++
 test/long_run/suite.lua                     | 109 +++++++++++++++++
 test/sophia/suite.ini                       |   2 +-
 third_party/sophia                          |   2 +-
 28 files changed, 548 insertions(+), 149 deletions(-)
 create mode 100644 test/long_run/box.lua
 create mode 100644 test/long_run/memtx_delete_insert.result
 create mode 100644 test/long_run/memtx_delete_insert.test.lua
 create mode 100644 test/long_run/memtx_dru.result
 create mode 100644 test/long_run/memtx_dru.test.lua
 create mode 100644 test/long_run/sophia_delete_insert.result
 create mode 100644 test/long_run/sophia_delete_insert.test.lua
 create mode 100644 test/long_run/sophia_dru.result
 create mode 100644 test/long_run/sophia_dru.test.lua
 create mode 100644 test/long_run/suite.ini
 create mode 100644 test/long_run/suite.lua

diff --git a/src/box/engine.cc b/src/box/engine.cc
index 06a94f1662..c459c627ec 100644
--- a/src/box/engine.cc
+++ b/src/box/engine.cc
@@ -54,6 +54,12 @@ void Engine::commit(struct txn*)
 void Engine::rollback(struct txn*)
 {}
 
+void Engine::rollbackStmt(struct txn_stmt*)
+{}
+
+void Engine::finish(struct txn*, bool)
+{}
+
 void Engine::initSystemSpace(struct space * /* space */)
 {
 	panic("not implemented");
diff --git a/src/box/engine.h b/src/box/engine.h
index e88a9ac70d..c8570c5c2f 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -107,9 +107,22 @@ class Engine: public Object {
 	/**
 	 * Engine specific transaction life-cycle routines.
 	 */
-	virtual void begin(struct txn*, struct space*);
-	virtual void commit(struct txn*);
-	virtual void rollback(struct txn*);
+	virtual void begin(struct txn *, struct space *);
+	virtual void commit(struct txn *);
+	virtual void rollbackStmt(struct txn_stmt *);
+	virtual void rollback(struct txn *);
+	/**
+	 * Called at the end of a transaction, if a transaction
+	 * has committed. Is an
+	 * artifact of autocommit mode, when commit
+	 * happens *before* the result is sent to the client,
+	 * and is used to dereference tuples which otherwise
+	 * could have been freed already at commit.
+	 * Has nothing to do in case of rollback, since
+	 * in that case the client doesn't get any tuples
+	 * back, so they can be freed already in rollback.
+	 */
+	virtual void finish(struct txn *, bool);
 	/**
 	 * Recover the engine to a checkpoint it has.
 	 * After that the engine will be given rows
diff --git a/src/box/errcode.h b/src/box/errcode.h
index d8b679d260..8ef1cd1ed2 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -148,6 +148,7 @@ struct errcode_record {
 	/* 94 */_(ER_CANT_UPDATE_PRIMARY_KEY,	2, "Attempt to modify a tuple field which is part of index '%s' in space '%s'") \
 	/* 95 */_(ER_UPDATE_INTEGER_OVERFLOW,   2, "Integer overflow when performing '%c' operation on field %u") \
 	/* 96 */_(ER_GUEST_USER_PASSWORD,       2, "Setting password for guest user has no effect") \
+	/* 97 */_(ER_TRANSACTION_CONFLICT,      2, "Transaction has been aborted by conflict") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc
index 52e928c1fe..dd218d6e2d 100644
--- a/src/box/lua/call.cc
+++ b/src/box/lua/call.cc
@@ -368,7 +368,7 @@ lbox_commit(lua_State * /* L */)
 		txn_rollback();
 		throw;
 	}
-	txn_finish(txn);
+	txn_finish(txn, true);
 	return 0;
 }
 
diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc
index 7ccfa21799..a51fd2f29a 100644
--- a/src/box/memtx_engine.cc
+++ b/src/box/memtx_engine.cc
@@ -432,6 +432,19 @@ MemtxEngine::keydefCheck(struct space *space, struct key_def *key_def)
 	}
 }
 
+void
+MemtxEngine::rollbackStmt(struct txn_stmt *stmt)
+{
+	if (stmt->old_tuple || stmt->new_tuple)
+	{
+		space_replace(stmt->space,
+		              stmt->new_tuple,
+		              stmt->old_tuple, DUP_INSERT);
+		if (stmt->new_tuple)
+			tuple_unref(stmt->new_tuple);
+	}
+}
+
 void
 MemtxEngine::rollback(struct txn *txn)
 {
@@ -444,6 +457,24 @@ MemtxEngine::rollback(struct txn *txn)
 	}
 }
 
+void
+MemtxEngine::finish(struct txn *txn, bool commit)
+{
+	struct txn_stmt *stmt;
+	if (commit) {
+		rlist_foreach_entry(stmt, &txn->stmts, next) {
+			if (stmt->old_tuple)
+				tuple_unref(stmt->old_tuple);
+		}
+		return;
+	}
+	/* rollback */
+	rlist_foreach_entry(stmt, &txn->stmts, next) {
+		if (stmt->new_tuple)
+			tuple_unref(stmt->new_tuple);
+	}
+}
+
 void
 MemtxEngine::beginJoin()
 {
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index 2c4f339d58..46f61775be 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -46,7 +46,9 @@ struct MemtxEngine: public Engine {
 	virtual void dropPrimaryKey(struct space *space);
 	virtual bool needToBuildSecondaryKey(struct space *space);
 	virtual void keydefCheck(struct space *space, struct key_def *key_def);
+	virtual void rollbackStmt(struct txn_stmt*);
 	virtual void rollback(struct txn*);
+	virtual void finish(struct txn*, bool);
 	virtual void beginJoin();
 	virtual void recoverToCheckpoint(int64_t lsn);
 	virtual void endRecovery();
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index c2c1b1473d..49a0196f21 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -970,7 +970,7 @@ wal_write(struct recovery_state *r, struct xrow_header *row)
 	 */
 	fill_lsn(r, row);
 	if (r->wal_mode == WAL_NONE)
-		return 0;
+		return vclock_sum(&r->vclock);
 
 	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
 
@@ -1004,7 +1004,9 @@ wal_write(struct recovery_state *r, struct xrow_header *row)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	return req->res;
+	if (req->res == -1)
+		return -1;
+	return vclock_sum(&r->vclock);
 }
 
 /* }}} */
diff --git a/src/box/request.cc b/src/box/request.cc
index 479c1abcf4..e79ffbd22b 100644
--- a/src/box/request.cc
+++ b/src/box/request.cc
@@ -64,7 +64,13 @@ execute_replace(struct request *request, struct port *port)
 	enum dup_replace_mode mode = dup_replace_mode(request->type);
 
 	txn_replace(txn, space, NULL, new_tuple, mode);
-	txn_commit_stmt(txn, port);
+	txn_commit_stmt(txn);
+	/*
+	 * Adding result to port must be after possible WAL write.
+	 * The reason is that any yield between port_add_tuple and port_eof
+	 * calls could lead to sending not finished response to iproto socket.
+	 */
+	port_add_tuple(port, new_tuple);
 }
 
 static void
@@ -82,7 +88,7 @@ execute_update(struct request *request, struct port *port)
 	struct tuple *old_tuple = pk->findByKey(key, part_count);
 
 	if (old_tuple == NULL) {
-		txn_commit_stmt(txn, port);
+		txn_commit_stmt(txn);
 		return;
 	}
 	TupleGuard old_guard(old_tuple);
@@ -99,7 +105,13 @@ execute_update(struct request *request, struct port *port)
 	if (! engine_auto_check_update(space->handler->engine->flags))
 		space_check_update(space, old_tuple, new_tuple);
 	txn_replace(txn, space, old_tuple, new_tuple, DUP_REPLACE);
-	txn_commit_stmt(txn, port);
+	txn_commit_stmt(txn);
+	/*
+	 * Adding result to port must be after possible WAL write.
+	 * The reason is that any yield between port_add_tuple and port_eof
+	 * calls could lead to sending not finished response to iproto socket.
+	 */
+	port_add_tuple(port, new_tuple);
 }
 
 static void
@@ -116,15 +128,21 @@ execute_delete(struct request *request, struct port *port)
 	uint32_t part_count = mp_decode_array(&key);
 	primary_key_validate(pk->key_def, key, part_count);
 	struct tuple *old_tuple = pk->findByKey(key, part_count);
-
-	if (old_tuple != NULL) {
-		TupleGuard old_guard(old_tuple);
-		txn_replace(txn, space, old_tuple, NULL, DUP_REPLACE_OR_INSERT);
+	if (old_tuple == NULL) {
+		txn_commit_stmt(txn);
+		return;
 	}
-	txn_commit_stmt(txn, port);
+	TupleGuard old_guard(old_tuple);
+	txn_replace(txn, space, old_tuple, NULL, DUP_REPLACE_OR_INSERT);
+	txn_commit_stmt(txn);
+	/*
+	 * Adding result to port must be after possible WAL write.
+	 * The reason is that any yield between port_add_tuple and port_eof
+	 * calls could lead to sending not finished response to iproto socket.
+	 */
+	port_add_tuple(port, old_tuple);
 }
 
-
 static void
 execute_select(struct request *request, struct port *port)
 {
@@ -142,6 +160,7 @@ execute_select(struct request *request, struct port *port)
 	enum iterator_type type = (enum iterator_type) request->iterator;
 
 	const char *key = request->key;
+
 	uint32_t part_count = key ? mp_decode_array(&key) : 0;
 
 	struct iterator *it = index->position();
diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc
index 6330a411f9..f5f807fe30 100644
--- a/src/box/sophia_engine.cc
+++ b/src/box/sophia_engine.cc
@@ -320,15 +320,6 @@ SophiaEngine::commit(struct txn *txn)
 {
 	if (txn->engine_tx == NULL)
 		return;
-	/* free involved tuples */
-	struct txn_stmt *stmt;
-	rlist_foreach_entry(stmt, &txn->stmts, next) {
-		if (! stmt->new_tuple)
-			continue;
-		assert(stmt->new_tuple->refs >= 1 &&
-		       stmt->new_tuple->refs < UINT8_MAX);
-		tuple_unref(stmt->new_tuple);
-	}
 	auto scoped_guard = make_scoped_guard([=] {
 		txn->engine_tx = NULL;
 	});
@@ -336,14 +327,33 @@ SophiaEngine::commit(struct txn *txn)
 	 * commit signature */
 	assert(txn->signature >= 0);
 	int rc = sp_prepare(txn->engine_tx, txn->signature);
-	assert(rc == 0);
-	if (rc == -1)
+	switch (rc) {
+	case 0:
+		break;
+	case 1: /* rollback */
+		tnt_raise(ClientError, ER_TRANSACTION_CONFLICT);
+		break;
+	case 2: /* lock */
+		sp_destroy(txn->engine_tx);
+		tnt_raise(ClientError, ER_TRANSACTION_CONFLICT);
+		break;
+	case -1:
 		sophia_raise(env);
+		break;
+	}
 	rc = sp_commit(txn->engine_tx);
 	if (rc == -1) {
-		sophia_raise(env);
+		panic("sophia commit failed: txn->signature = %" PRIu64, txn->signature);
 	}
-	assert(rc == 0);
+}
+
+void
+SophiaEngine::rollbackStmt(struct txn_stmt *stmt)
+{
+	if (stmt->old_tuple)
+		tuple_unref(stmt->old_tuple);
+	if (stmt->new_tuple)
+		tuple_unref(stmt->new_tuple);
 }
 
 void
@@ -355,6 +365,18 @@ SophiaEngine::rollback(struct txn *txn)
 	txn->engine_tx = NULL;
 }
 
+void
+SophiaEngine::finish(struct txn *txn, bool)
+{
+	struct txn_stmt *stmt;
+	rlist_foreach_entry(stmt, &txn->stmts, next) {
+		if (stmt->old_tuple)
+			tuple_unref(stmt->old_tuple);
+		if (stmt->new_tuple)
+			tuple_unref(stmt->new_tuple);
+	}
+}
+
 void
 SophiaEngine::beginJoin()
 {
diff --git a/src/box/sophia_engine.h b/src/box/sophia_engine.h
index 3125d3a587..8f5c521b65 100644
--- a/src/box/sophia_engine.h
+++ b/src/box/sophia_engine.h
@@ -34,12 +34,14 @@ struct SophiaEngine: public Engine {
 	SophiaEngine();
 	virtual void init();
 	virtual Handler *open();
-	virtual Index *createIndex(struct key_def*);
+	virtual Index *createIndex(struct key_def *);
 	virtual void dropIndex(Index*);
-	virtual void keydefCheck(struct space *space, struct key_def*f);
-	virtual void begin(struct txn*, struct space*);
-	virtual void commit(struct txn*);
-	virtual void rollback(struct txn*);
+	virtual void keydefCheck(struct space *space, struct key_def *f);
+	virtual void begin(struct txn *txn, struct space *space);
+	virtual void commit(struct txn *txn);
+	virtual void rollbackStmt(struct txn_stmt *stmt);
+	virtual void rollback(struct txn *txn);
+	virtual void finish(struct txn *txn, bool is_commit);
 	virtual void beginJoin();
 	virtual void recoverToCheckpoint(int64_t);
 	virtual void endRecovery();
@@ -52,6 +54,7 @@ struct SophiaEngine: public Engine {
 private:
 	int64_t m_prev_checkpoint_lsn;
 	int64_t m_checkpoint_lsn;
+public:
 	int recovery_complete;
 };
 
diff --git a/src/box/sophia_index.cc b/src/box/sophia_index.cc
index 54ed7fa21d..f4b9b2d276 100644
--- a/src/box/sophia_index.cc
+++ b/src/box/sophia_index.cc
@@ -42,8 +42,15 @@
 #include <stdio.h>
 #include <inttypes.h>
 
-static inline int
-sophia_index_stmt(void *tx, void *db, int del, struct key_def *key_def, struct tuple *tuple)
+enum sophia_op {
+	SOPHIA_SET,
+	SOPHIA_DELETE
+};
+
+static inline void
+sophia_write(void *env, void *db, void *tx, enum sophia_op op,
+             struct key_def *key_def,
+             struct tuple *tuple)
 {
 	const char *key = tuple_field(tuple, key_def->parts[0].fieldno);
 	const char *keyptr = key;
@@ -51,18 +58,25 @@ sophia_index_stmt(void *tx, void *db, int del, struct key_def *key_def, struct t
 	size_t keysize = keyptr - key;
 	void *o = sp_object(db);
 	if (o == NULL)
-		return -1;
+		sophia_raise(env);
 	sp_set(o, "key", key, keysize);
-	if (del) {
-		return sp_delete(tx, o);
-	}
 	sp_set(o, "value", tuple->data, tuple->bsize);
-	return sp_set(tx, o);
+	int rc;
+	switch (op) {
+	case SOPHIA_DELETE:
+		rc = sp_delete(tx, o);
+		break;
+	case SOPHIA_SET:
+		rc = sp_set(tx, o);
+		break;
+	}
+	if (rc == -1)
+		sophia_raise(env);
 }
 
 static struct tuple*
-sophia_index_get(void *env, void *db, void *tx, const char *key, size_t keysize,
-                 struct tuple_format *format)
+sophia_read(void *env, void *db, void *tx, const char *key, size_t keysize,
+            struct tuple_format *format)
 {
 	void *o = sp_object(db);
 	if (o == NULL)
@@ -78,13 +92,19 @@ sophia_index_get(void *env, void *db, void *tx, const char *key, size_t keysize,
 	return tuple_new(format, (char*)value, (char*)value + valuesize);
 }
 
+struct sophia_format {
+	uint32_t offset;
+	uint16_t size;
+} __attribute__((packed));
+
 static inline int
-sophia_index_compare(char *a, size_t asz __attribute__((unused)),
-                     char *b, size_t bsz __attribute__((unused)),
-                     void *arg)
+sophia_compare(char *a, size_t asz __attribute__((unused)),
+               char *b, size_t bsz __attribute__((unused)),
+               void *arg)
 {
 	struct key_def *key_def = (struct key_def*)arg;
-
+	a = a + ((struct sophia_format*)a)[0].offset;
+	b = b + ((struct sophia_format*)b)[0].offset;
 	int rc = tuple_compare_field(a, b, key_def->parts[0].type);
 	return (rc == 0) ? 0 :
 	       ((rc > 0) ? 1 : -1);
@@ -102,9 +122,12 @@ sophia_configure(struct space *space, struct key_def *key_def)
 	char name[128];
 	snprintf(name, sizeof(name), "%" PRIu32, key_def->space_id);
 	sp_set(c, "db", name);
+	snprintf(name, sizeof(name), "db.%" PRIu32 ".format",
+	         key_def->space_id);
+	sp_set(c, name, "document");
 	snprintf(name, sizeof(name), "db.%" PRIu32 ".index.cmp",
 	         key_def->space_id);
-	snprintf(pointer, sizeof(pointer), "pointer: %p", (void*)sophia_index_compare);
+	snprintf(pointer, sizeof(pointer), "pointer: %p", (void*)sophia_compare);
 	snprintf(pointer_arg, sizeof(pointer_arg), "pointer: %p", (void*)key_def);
 	sp_set(c, name, pointer, pointer_arg);
 	snprintf(name, sizeof(name), "db.%" PRIu32 ".compression", key_def->space_id);
@@ -155,29 +178,6 @@ SophiaIndex::~SophiaIndex()
 	}
 }
 
-struct tuple *
-SophiaIndex::random(uint32_t rnd) const
-{
-	void *o = sp_object(db);
-	if (o == NULL)
-		sophia_raise(env);
-	sp_set(o, "key", &rnd, sizeof(rnd));
-	sp_set(o, "order", "random");
-	void *c = sp_cursor(db, o);
-	if (c == NULL)
-		sophia_raise(env);
-	auto scoped_guard =
-		make_scoped_guard([=] { sp_destroy(c); });
-	o = sp_get(c);
-	if (o == NULL)
-		return NULL;
-	struct space *space = space_cache_find(key_def->space_id);
-	int valuesize;
-	void *value = sp_get(o, "value", &valuesize);
-	return tuple_new(space->format, (char*)value,
-	                 (char*)value + valuesize);
-}
-
 size_t
 SophiaIndex::size() const
 {
@@ -219,7 +219,7 @@ SophiaIndex::findByKey(const char *key, uint32_t part_count) const
 	size_t keysize = keyptr - key;
 	struct space *space = space_cache_find(key_def->space_id);
 	void *tx = in_txn() ? in_txn()->engine_tx : NULL;
-	return sophia_index_get(env, db, tx, key, keysize, space->format);
+	return sophia_read(env, db, tx, key, keysize, space->format);
 }
 
 struct tuple *
@@ -229,29 +229,37 @@ SophiaIndex::replace(struct tuple *old_tuple, struct tuple *new_tuple,
 	struct space *space = space_cache_find(key_def->space_id);
 	struct txn *txn = in_txn();
 	assert(txn != NULL && txn->engine_tx != NULL);
+	void *tx = txn->engine_tx;
 
-	/* do not involve in tarantool transaction regarding old_tuple,
-	 * always return NULL.
-	*/
+	/* This method does not return old tuple for replace,
+	 * insert or update.
+	 *
+	 * Delete does return old tuple to be properly
+	 * scheduled for wal write.
+	 */
+
+	/* Switch from INSERT to REPLACE during recovery.
+	 *
+	 * Database might hold newer key version than currenly
+	 * recovered log record.
+	 */
+	if (mode == DUP_INSERT) {
+		SophiaEngine *engine = (SophiaEngine*)space->handler->engine;
+		if (! engine->recovery_complete)
+			mode = DUP_REPLACE_OR_INSERT;
+	}
 
 	/* delete */
-	int rc;
 	if (old_tuple && new_tuple == NULL) {
-		assert(old_tuple != NULL);
-		rc = sophia_index_stmt(txn->engine_tx, db, 1, key_def,
-				       old_tuple);
-		if (rc == -1)
-			sophia_raise(env);
-		return NULL;
+		sophia_write(env, db, tx, SOPHIA_DELETE, key_def, old_tuple);
+		tuple_ref(old_tuple);
+		return old_tuple;
 	}
 
 	/* update */
 	if (old_tuple && new_tuple) {
 		/* assume no primary key update is supported */
-		rc = sophia_index_stmt(txn->engine_tx, db, 0, key_def,
-				       new_tuple);
-		if (rc == -1)
-			sophia_raise(env);
+		sophia_write(env, db, tx, SOPHIA_SET, key_def, new_tuple);
 		return NULL;
 	}
 
@@ -263,8 +271,7 @@ SophiaIndex::replace(struct tuple *old_tuple, struct tuple *new_tuple,
 		mp_next(&keyptr);
 		size_t keysize = keyptr - key;
 		struct tuple *dup_tuple =
-			sophia_index_get(env, db, txn->engine_tx, key,
-					 keysize, space->format);
+			sophia_read(env, db, tx, key, keysize, space->format);
 		if (dup_tuple) {
 			tuple_ref(dup_tuple);
 			int error = 0;
@@ -280,10 +287,7 @@ SophiaIndex::replace(struct tuple *old_tuple, struct tuple *new_tuple,
 		}
 	}
 	case DUP_REPLACE_OR_INSERT:
-		rc = sophia_index_stmt(txn->engine_tx, db, 0, key_def,
-				       new_tuple);
-		if (rc == -1)
-			sophia_raise(env);
+		sophia_write(env, db, tx, SOPHIA_SET, key_def, new_tuple);
 		break;
 	case DUP_REPLACE:
 	default:
@@ -352,8 +356,8 @@ sophia_iterator_eq(struct iterator *ptr)
 	ptr->next = sophia_iterator_last;
 	struct sophia_iterator *it = (struct sophia_iterator *) ptr;
 	assert(it->cursor == NULL);
-	return sophia_index_get(it->env, it->db, it->tx, it->key, it->keysize,
-	                        it->space->format);
+	return sophia_read(it->env, it->db, it->tx, it->key, it->keysize,
+	                   it->space->format);
 }
 
 struct iterator *
diff --git a/src/box/sophia_index.h b/src/box/sophia_index.h
index cbef29754e..6dd6c39d89 100644
--- a/src/box/sophia_index.h
+++ b/src/box/sophia_index.h
@@ -37,7 +37,6 @@ class SophiaIndex: public Index {
 	~SophiaIndex();
 
 	virtual size_t size() const;
-	virtual struct tuple *random(uint32_t rnd) const;
 	virtual struct tuple *findByKey(const char *key, uint32_t part_count) const;
 	virtual struct tuple *replace(struct tuple *old_tuple,
 				      struct tuple *new_tuple,
diff --git a/src/box/txn.cc b/src/box/txn.cc
index 3d4ae07f2d..cb4fc3e74c 100644
--- a/src/box/txn.cc
+++ b/src/box/txn.cc
@@ -89,7 +89,8 @@ txn_replace(struct txn *txn, struct space *space,
 	}
 	stmt->space = space;
 
-	/* Memtx doesn't allow yields between statements of
+	/*
+	 * Memtx doesn't allow yields between statements of
 	 * a transaction. Set a trigger which would roll
 	 * back the transaction if there is a yield.
 	 */
@@ -103,7 +104,6 @@ txn_replace(struct txn *txn, struct space *space,
 			}
 		}
 	}
-
 	/*
 	 * Run on_replace triggers. For now, disallow mutation
 	 * of tuples in the trigger.
@@ -184,21 +184,12 @@ txn_begin_stmt(struct request *request, struct space *space)
 }
 
 void
-txn_commit_stmt(struct txn *txn, struct port *port)
+txn_commit_stmt(struct txn *txn)
 {
-	struct txn_stmt *stmt;
-	struct tuple *tuple;
-	stmt = txn_stmt(txn);
-	if (txn->autocommit)
+	if (txn->autocommit) {
 		txn_commit(txn);
-	/* Adding result to port must be after possible WAL write.
-	 * The reason is that any yield between port_add_tuple and port_eof
-	 * calls could lead to sending not finished response to iproto socket.
-	 */
-	if ((tuple = stmt->new_tuple) || (tuple = stmt->old_tuple))
-		port_add_tuple(port, tuple);
-	if (txn->autocommit)
-		txn_finish(txn);
+		txn_finish(txn, true);
+	}
 }
 
 void
@@ -207,15 +198,6 @@ txn_commit(struct txn *txn)
 	assert(txn == in_txn());
 	struct txn_stmt *stmt;
 	/* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */
-
-	/* xxx: temporary workaround to handle transaction
-	 *      conflicts with sophia.
-	 */
-	if (txn->engine) {
-		txn->engine->commit(txn);
-		txn->engine_tx = NULL;
-	}
-
 	trigger_clear(&txn->fiber_on_yield);
 	trigger_clear(&txn->fiber_on_stop);
 
@@ -237,21 +219,19 @@ txn_commit(struct txn *txn)
 			tnt_raise(LoggedError, ER_WAL_IO);
 		txn->signature = res;
 	}
-	/*
+
+	/* xxx: engine commit may throw on conflict or error */
 	if (txn->engine)
 		txn->engine->commit(txn);
-	*/
+
 	trigger_run(&txn->on_commit, txn); /* must not throw. */
 }
 
 void
-txn_finish(struct txn *txn)
+txn_finish(struct txn *txn, bool commit)
 {
-	struct txn_stmt *stmt;
-	rlist_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->old_tuple)
-			tuple_unref(stmt->old_tuple);
-	}
+	if (txn->engine)
+		txn->engine->finish(txn, commit);
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
@@ -273,16 +253,9 @@ txn_rollback_stmt()
 	if (txn->autocommit)
 		return txn_rollback();
 	struct txn_stmt *stmt = txn_stmt(txn);
-	if (stmt->old_tuple || stmt->new_tuple) {
-		if (! space_is_sophia(stmt->space)) {
-			space_replace(stmt->space,
-			              stmt->new_tuple,
-			              stmt->old_tuple, DUP_INSERT);
-		}
-		if (stmt->new_tuple)
-			tuple_unref(stmt->new_tuple);
-	}
-	stmt->old_tuple = stmt->new_tuple = NULL;
+	txn->engine->rollbackStmt(stmt);
+	stmt->old_tuple = NULL;
+	stmt->new_tuple = NULL;
 	stmt->space = NULL;
 	stmt->row = NULL;
 }
@@ -296,18 +269,10 @@ txn_rollback()
 	if (txn->engine)
 		txn->engine->rollback(txn);
 	trigger_run(&txn->on_rollback, txn); /* must not throw. */
-	struct txn_stmt *stmt;
-	rlist_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->new_tuple)
-			tuple_unref(stmt->new_tuple);
-	}
 	/* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */
 		trigger_clear(&txn->fiber_on_yield);
 		trigger_clear(&txn->fiber_on_stop);
-	TRASH(txn);
-	/** Free volatile txn memory. */
-	fiber_gc();
-	fiber_set_txn(fiber(), NULL);
+	txn_finish(txn, false);
 }
 
 void
diff --git a/src/box/txn.h b/src/box/txn.h
index 9ae74462df..9a205fa1e1 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -97,7 +97,7 @@ txn_begin_stmt(struct request *request, struct space *space);
  * the current transaction as well.
  */
 void
-txn_commit_stmt(struct txn *txn, struct port *port);
+txn_commit_stmt(struct txn *txn);
 
 /**
  * Rollback a statement. In autocommit mode,
@@ -125,7 +125,7 @@ txn_commit(struct txn *txn);
  * @pre txn == in_txn()
  */
 void
-txn_finish(struct txn *txn);
+txn_finish(struct txn *txn, bool commit);
 
 /** Rollback a transaction, if any. */
 void
diff --git a/test/box/misc.result b/test/box/misc.result
index 8b9c75937e..c95502389f 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -203,6 +203,7 @@ t;
   - 'box.error.CROSS_ENGINE_TRANSACTION : 81'
   - 'box.error.MODIFY_INDEX : 14'
   - 'box.error.PASSWORD_MISMATCH : 47'
+  - 'box.error.TRANSACTION_CONFLICT : 97'
   - 'box.error.NO_SUCH_ENGINE : 57'
   - 'box.error.FIELD_TYPE : 23'
   - 'box.error.ACCESS_DENIED : 42'
diff --git a/test/long_run/box.lua b/test/long_run/box.lua
new file mode 100644
index 0000000000..9cac3cadb0
--- /dev/null
+++ b/test/long_run/box.lua
@@ -0,0 +1,21 @@
+#!/usr/bin/env tarantool
+
+require('suite')
+
+os.execute("mkdir -p sophia_test")
+
+local sophia = {
+	threads = 5
+}
+
+box.cfg {
+    listen            = os.getenv("LISTEN"),
+    slab_alloc_arena  = 0.1,
+    pid_file          = "tarantool.pid",
+    rows_per_wal      = 500000,
+    sophia_dir        = "./sophia_test",
+    sophia            = sophia,
+    custom_proc_title = "default"
+}
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/long_run/memtx_delete_insert.result b/test/long_run/memtx_delete_insert.result
new file mode 100644
index 0000000000..567eff06ab
--- /dev/null
+++ b/test/long_run/memtx_delete_insert.result
@@ -0,0 +1,11 @@
+dofile('suite.lua')
+---
+...
+engine_name = 'memtx'
+---
+...
+delete_insert(engine_name)
+---
+- - 100000
+  - YYVWRROXKBJACORUZIFH
+...
diff --git a/test/long_run/memtx_delete_insert.test.lua b/test/long_run/memtx_delete_insert.test.lua
new file mode 100644
index 0000000000..4357cbf93a
--- /dev/null
+++ b/test/long_run/memtx_delete_insert.test.lua
@@ -0,0 +1,5 @@
+
+dofile('suite.lua')
+
+engine_name = 'memtx'
+delete_insert(engine_name)
diff --git a/test/long_run/memtx_dru.result b/test/long_run/memtx_dru.result
new file mode 100644
index 0000000000..434d264489
--- /dev/null
+++ b/test/long_run/memtx_dru.result
@@ -0,0 +1,59 @@
+dofile('suite.lua')
+---
+...
+engine_name = 'memtx'
+---
+...
+math.randomseed(1)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - EMLMARJEFGTVZHSKBOOS
+  - BUPYVKWEHVEFKSZFAQIK
+...
+math.randomseed(2)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - DYHLURGFSDJLBUPOBAUV
+  - UHMBETMEPMQWOIHLAOSR
+...
+math.randomseed(3)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 1
+  - KOWURFNHHVGDRYNKXJUM
+  - RSMFJYXLNSOILKDEFJZW
+...
+math.randomseed(4)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - OLLNLRZCZFCOHXGVAVPX
+  - RXXYWNPDXVVGONOGTEZK
+...
+math.randomseed(5)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - LDPXEIVXEAAAGIVUFAZL
+  - HKBELZCRRWNZHFIRAHJD
+...
+delete_replace_update_cleanup()
+---
+...
diff --git a/test/long_run/memtx_dru.test.lua b/test/long_run/memtx_dru.test.lua
new file mode 100644
index 0000000000..fde52d40bb
--- /dev/null
+++ b/test/long_run/memtx_dru.test.lua
@@ -0,0 +1,21 @@
+
+dofile('suite.lua')
+
+engine_name = 'memtx'
+
+math.randomseed(1)
+delete_replace_update(engine_name)
+
+math.randomseed(2)
+delete_replace_update(engine_name)
+
+math.randomseed(3)
+delete_replace_update(engine_name)
+
+math.randomseed(4)
+delete_replace_update(engine_name)
+
+math.randomseed(5)
+delete_replace_update(engine_name)
+
+delete_replace_update_cleanup()
diff --git a/test/long_run/sophia_delete_insert.result b/test/long_run/sophia_delete_insert.result
new file mode 100644
index 0000000000..81507fa838
--- /dev/null
+++ b/test/long_run/sophia_delete_insert.result
@@ -0,0 +1,11 @@
+dofile('suite.lua')
+---
+...
+engine_name = 'sophia'
+---
+...
+delete_insert(engine_name)
+---
+- - 100000
+  - YYVWRROXKBJACORUZIFH
+...
diff --git a/test/long_run/sophia_delete_insert.test.lua b/test/long_run/sophia_delete_insert.test.lua
new file mode 100644
index 0000000000..4810a16554
--- /dev/null
+++ b/test/long_run/sophia_delete_insert.test.lua
@@ -0,0 +1,5 @@
+
+dofile('suite.lua')
+
+engine_name = 'sophia'
+delete_insert(engine_name)
diff --git a/test/long_run/sophia_dru.result b/test/long_run/sophia_dru.result
new file mode 100644
index 0000000000..2cf8e9bf0f
--- /dev/null
+++ b/test/long_run/sophia_dru.result
@@ -0,0 +1,59 @@
+dofile('suite.lua')
+---
+...
+engine_name = 'sophia'
+---
+...
+math.randomseed(1)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - EMLMARJEFGTVZHSKBOOS
+  - BUPYVKWEHVEFKSZFAQIK
+...
+math.randomseed(2)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - DYHLURGFSDJLBUPOBAUV
+  - UHMBETMEPMQWOIHLAOSR
+...
+math.randomseed(3)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 1
+  - KOWURFNHHVGDRYNKXJUM
+  - RSMFJYXLNSOILKDEFJZW
+...
+math.randomseed(4)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - OLLNLRZCZFCOHXGVAVPX
+  - RXXYWNPDXVVGONOGTEZK
+...
+math.randomseed(5)
+---
+...
+delete_replace_update(engine_name)
+---
+- - 100000
+  - 3
+  - LDPXEIVXEAAAGIVUFAZL
+  - HKBELZCRRWNZHFIRAHJD
+...
+delete_replace_update_cleanup()
+---
+...
diff --git a/test/long_run/sophia_dru.test.lua b/test/long_run/sophia_dru.test.lua
new file mode 100644
index 0000000000..14180eb2d7
--- /dev/null
+++ b/test/long_run/sophia_dru.test.lua
@@ -0,0 +1,21 @@
+
+dofile('suite.lua')
+
+engine_name = 'sophia'
+
+math.randomseed(1)
+delete_replace_update(engine_name)
+
+math.randomseed(2)
+delete_replace_update(engine_name)
+
+math.randomseed(3)
+delete_replace_update(engine_name)
+
+math.randomseed(4)
+delete_replace_update(engine_name)
+
+math.randomseed(5)
+delete_replace_update(engine_name)
+
+delete_replace_update_cleanup()
diff --git a/test/long_run/suite.ini b/test/long_run/suite.ini
new file mode 100644
index 0000000000..747c518ba7
--- /dev/null
+++ b/test/long_run/suite.ini
@@ -0,0 +1,9 @@
+[default]
+core = tarantool
+description = long running tests
+script = box.lua
+disabled = memtx_dru.test.lua sophia_dru.test.lua memtx_delete_insert.test.lua sophia_delete_insert.test.lua
+valgrind_disabled =
+release_disabled =
+lua_libs = suite.lua
+use_unix_sockets = True
diff --git a/test/long_run/suite.lua b/test/long_run/suite.lua
new file mode 100644
index 0000000000..dbd4eab028
--- /dev/null
+++ b/test/long_run/suite.lua
@@ -0,0 +1,109 @@
+
+function string_function()
+	local random_number
+	local random_string
+	random_string = ""
+	for x = 1,20,1 do
+		random_number = math.random(65, 90)
+		random_string = random_string .. string.char(random_number)
+	end
+	return random_string
+end
+
+function delete_replace_update_cleanup()
+	if (box.space._space.index.name:select{'tester'}[1] ~= nil) then
+		box.space.tester:drop()
+	end
+end
+
+function delete_replace_update(engine_name)
+	local string_value
+	delete_replace_update_cleanup()
+	box.schema.space.create('tester', {engine=engine_name})
+	box.space.tester:create_index('primary',{type = 'tree', parts = {2, 'STR'}})
+
+	counter = 1
+	while counter < 100000 do
+		string_value = string_function()
+
+		string_table = box.space.tester.index.primary:select({string_value}, {iterator = 'GE', limit = 1})
+		if string_table[1] == nil then
+			box.space.tester:insert{counter, string_value}
+			string_value_2 = string_value
+		else
+			string_value_2 = string_table[1][2]
+		end
+
+		if string_value_2 == nil then
+			box.space.tester:insert{counter, string_value}
+			string_value_2 = string_value
+		end
+
+		random_number = math.random(1,6)
+
+		string_value_3 = string_function()
+--		print('<'..counter..'> [' ..  random_number .. '] value_2: ' .. string_value_2 .. ' value_3: ' .. string_value_3)
+		if random_number == 1 then
+			box.space.tester:delete{string_value_2}
+		end
+		if random_number == 2 then
+			box.space.tester:replace{counter, string_value_2, string_value_3}
+		end
+		if random_number == 3 then
+			box.space.tester:delete{string_value_2}
+			box.space.tester:insert{counter, string_value_2}
+		end
+		if random_number == 4 then
+			if counter < 1000000 then
+				box.space.tester:delete{string_value_3}
+				box.space.tester:insert{counter, string_value_3, string_value_2}
+			end
+		end
+		if random_number == 5 then
+			box.space.tester:update({string_value_2}, {{'=', 1, string_value_3}})
+		end
+		if random_number == 6 then
+			box.space.tester:update({string_value_2}, {{'=', 1, string_value_3}})
+		end
+		counter = counter + 1
+	end
+
+	return {counter, random_number, string_value_2, string_value_3}
+end
+
+function delete_insert(engine_name)
+	local string_value
+	if (box.space._space.index.name:select{'tester'}[1] ~= nil) then
+		box.space.tester:drop()
+	end
+	box.schema.space.create('tester', {engine=engine_name})
+	box.space.tester:create_index('primary',{type = 'tree', parts = {2, 'STR'}})
+	counter = 1
+	while counter < 100000 do
+		string_value = string_function()
+		string_table = box.space.tester.index.primary:select({string_value}, {iterator = 'GE', limit = 1})
+
+		if string_table[1] == nil then
+			-- print (1, ' insert', counter, string_value)
+			box.space.tester:insert{counter, string_value}
+			string_value_2 = string_value
+		else
+			string_value_2 = string_table[1][2]
+		end
+
+		if string_value_2 == nil then
+			-- print (2, ' insert', counter, string_value)
+			box.space.tester:insert{counter, string_value}
+			string_value_2 = string_value
+		end
+
+		-- print (3, ' delete', counter, string_value_2)
+		box.space.tester:delete{string_value_2}
+
+		-- print (4, ' insert', counter, string_value_2)
+		box.space.tester:insert{counter, string_value_2}
+
+		counter = counter + 1
+	end
+	return {counter, string_value_2, string_value_3}
+end
diff --git a/test/sophia/suite.ini b/test/sophia/suite.ini
index 368cc90bd4..6c365fdcfe 100644
--- a/test/sophia/suite.ini
+++ b/test/sophia/suite.ini
@@ -2,7 +2,7 @@
 core = tarantool
 description = sophia integration tests
 script = box.lua
-disabled = info.test.lua truncate.test.lua
+disabled = info.test.lua truncate.test.lua random.test.lua
 valgrind_disabled =
 release_disabled =
 lua_libs = suite.lua index_random_test.lua
diff --git a/third_party/sophia b/third_party/sophia
index 68ce375fb7..ee2f6d0145 160000
--- a/third_party/sophia
+++ b/third_party/sophia
@@ -1 +1 @@
-Subproject commit 68ce375fb75f9194edf967d96d0ff5dc04f3724f
+Subproject commit ee2f6d01454a2374877bcdda5b5791497b737f9b
-- 
GitLab