From 316c41c05f40ff5da65961366abee454e0abbafe Mon Sep 17 00:00:00 2001
From: Roman Tsisyk <roman@tsisyk.com>
Date: Wed, 5 Dec 2012 13:21:36 +0400
Subject: [PATCH] Minor fixes in space_replace, index_replace and txn_replace
 methods

---
 src/box/index.m   | 95 ++++++++++++++++++++++-------------------------
 src/box/request.m |  3 +-
 src/box/space.h   | 37 +++++++++++-------
 src/box/space.m   |  2 +-
 src/box/tree.m    | 91 ++++++++++++++++++++-------------------------
 src/box/txn.m     |  5 +--
 6 files changed, 112 insertions(+), 121 deletions(-)

diff --git a/src/box/index.m b/src/box/index.m
index 94228c24a6..69bdefd64b 100644
--- a/src/box/index.m
+++ b/src/box/index.m
@@ -454,9 +454,9 @@ hash_iterator_lstr_eq(struct iterator *it)
 			  : (struct tuple *) new_tuple
 			  : (u32) flags
 {
-	assert (old_tuple != NULL || new_tuple != NULL);
-	assert(key_def->is_unique);
+	/* Mostly like tree::replace */
 
+	assert (old_tuple != NULL || new_tuple != NULL);
 	void *node1 = alloca([self node_size]);
 	void *node2 = alloca([self node_size]);
 
@@ -464,72 +464,68 @@ hash_iterator_lstr_eq(struct iterator *it)
 		/* Case #1: replace(new_tuple); */
 
 		void *new_node = node1;
-		void *old_node = node2;
-
+		void *replaced_node = node2;
 		[self fold: new_node: new_tuple];
-		[self replaceNode: hash: new_node: &old_node];
 
-		if (unlikely(old_node && (flags & BOX_ADD))) {
-			/* Rollback changes */
-			[self replaceNode: hash: old_node: NULL];
+		/* Try to optimistically replace the new_tuple in the space */
+		[self replaceNode: hash: new_node: &replaced_node];
+
+		if (unlikely(key_def->is_unique && (flags & BOX_ADD)
+			     && replaced_node != NULL)) {
+			/* BOX_ADD, a tuple with the same key is found */
+			[self replaceNode: hash: replaced_node: NULL];
 			tnt_raise(ClientError, :ER_TUPLE_FOUND);
 		}
 
-		if (unlikely(!old_node && (flags & BOX_REPLACE))) {
-			/* Rollback changes */
+		if (unlikely(key_def->is_unique && (flags & BOX_REPLACE)
+			     && replaced_node == NULL)) {
+			/* BOX_REPLACE,a tuple with the same key is not found */
 			[self deleteNode: hash: new_node: NULL];
 			tnt_raise(ClientError, :ER_TUPLE_NOT_FOUND);
 		}
 
-		/* Return a removed node */
-		return [self unfold: old_node];
+		/* Return the removed node */
+		return [self unfold: replaced_node];
 	} else if (!new_tuple && old_tuple) {
 		/* Case #2: remove(old_tuple) */
 
 		void *old_node  = node1;
-		void *old_node2 = node2;
+		void *removed_node = node2;
 
 		[self fold: old_node: old_tuple];
-		[self deleteNode: hash: old_node: &old_node2];
-#if 0		/* TODO: A new undocumented feature */
-		if (unlikely((flags & BOX_REPLACE) && !old_node2)) {
-			/* Rollback changes */
-			[self replaceNode: hash: old_node: NULL];
-			tnt_raise(ClientError, :ER_TUPLE_NOT_FOUND);
-		}
-#endif
-		return [self unfold: old_node2];
+		[self deleteNode: hash: old_node: &removed_node];
+		assert([self unfold: removed_node] == NULL ||
+		       [self unfold: removed_node] == old_tuple);
+
+		return [self unfold: removed_node];
 	} else /* (old_tuple != NULL && new_tuple != NULL) */ {
 		/* Case #3: remove(old_tuple); insert(new_tuple) */
 
-		/* BOX_ADD is only supported in this case */
-		assert(flags & BOX_ADD);
-
+		assert(!key_def->is_unique || (flags & BOX_ADD));
 		void *new_node = node1;
-		void *old_node = node2;
+		void *replaced_node = node2;
 
 		[self fold: new_node: new_tuple];
-		[self replaceNode: hash: new_node: &old_node];
-
-		struct tuple *ret = NULL;
-		if (old_node) {
-			ret = [self unfold: old_node];
-
-			if (unlikely(ret != old_tuple)) {
-				/* Rollback changes */
-				[self replaceNode: hash: old_node: NULL];
-				tnt_raise(ClientError, :ER_TUPLE_FOUND);
-			}
-		} else {
-			void *old_node  = node1;
-			void *old_node2 = node2;
-			[self fold: old_node: old_tuple];
-			[self deleteNode: hash: old_node: &old_node2];
-
-			ret = [self unfold: old_node2];
+		[self replaceNode: hash: new_node: &replaced_node];
+		assert(key_def->is_unique || replaced_node == NULL);
+
+		if (replaced_node) {
+			/* The old_tuple had the same key as the new_tuple and
+			 * it was succesfully removed during replace call */
+			assert(old_tuple == [self unfold: replaced_node]);
+			return old_tuple;
 		}
 
-		return ret;
+		/* Nothing was removed during replace call,
+		 * so remove the old_tuple manually */
+		void *old_node  = node1;
+		void *removed_node = node2;
+		[self fold: old_node: old_tuple];
+		[self deleteNode: hash: old_node: &removed_node];
+
+		assert([self unfold: removed_node] == NULL ||
+		       [self unfold: removed_node] == old_tuple);
+		return  [self unfold: removed_node];
 	}
 
 	return NULL;
@@ -626,8 +622,7 @@ int32_key_to_value(void *key)
 	const struct mh_i32ptr_node_t *node_x = (struct mh_i32ptr_node_t *) node;
 
 	mh_int_t k = mh_i32ptr_get(h, node_x, NULL, NULL);
-	if (k != mh_end(int_hash) &&
-			mh_i32ptr_node(int_hash, k)->val == node_x->val) {
+	if (k != mh_end(int_hash)) {
 		if (pprev) {
 			memcpy(*pprev, mh_i32ptr_node(int_hash, k),
 			       sizeof(struct mh_i32ptr_node_t));
@@ -775,8 +770,7 @@ int64_key_to_value(void *key)
 	const struct mh_i64ptr_node_t *node_x = (struct mh_i64ptr_node_t *) node;
 
 	mh_int_t k = mh_i64ptr_get(h, node_x, NULL, NULL);
-	if (k != mh_end(int64_hash) &&
-			mh_i64ptr_node(int64_hash, k)->val == node_x->val) {
+	if (k != mh_end(int64_hash)) {
 		if (pprev) {
 			memcpy(*pprev, mh_i64ptr_node(int64_hash, k),
 			       sizeof(struct mh_i64ptr_node_t));
@@ -920,8 +914,7 @@ int64_key_to_value(void *key)
 	const struct mh_lstrptr_node_t *node_x = (struct mh_lstrptr_node_t *) node;
 
 	mh_int_t k = mh_lstrptr_get(h, node_x, NULL, NULL);
-	if (k != mh_end(str_hash) &&
-			mh_lstrptr_node(str_hash, k)->val == node_x->val) {
+	if (k != mh_end(str_hash)) {
 		if (pprev) {
 			memcpy(*pprev, mh_lstrptr_node(str_hash, k),
 			       sizeof(struct mh_lstrptr_node_t));
diff --git a/src/box/request.m b/src/box/request.m
index b4d7dd5d07..59f6898977 100644
--- a/src/box/request.m
+++ b/src/box/request.m
@@ -696,9 +696,8 @@ execute_update(struct request *request, struct txn *txn)
 	/* Try to find the tuple by primary key. */
 	struct tuple *old_tuple = [pk findByKey :key :key_part_count];
 
-	if (unlikely(old_tuple == NULL)) {
+	if (unlikely(old_tuple == NULL))
 		return;
-	}
 
 	/* number of operations */
 	u32 op_cnt = read_u32(data);
diff --git a/src/box/space.h b/src/box/space.h
index f26f662438..bc2b0dbacd 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -91,14 +91,28 @@ static inline i32 space_n(struct space *sp) { return sp->no; }
  * There is three major use cases for this method:
  *
  * 1. old_tuple = NULL, new_tuple != NULL
- * Insert or replace the @a new_tuple in the @a space.
+ * Inserts or replaces the @a new_tuple in the @a space. If no flags are set
+ * and an old tuple in the @a sp has the same primary key as @a new_tuple,
+ * the old tuple is removed before new tuple is inserted. If BOX_ADD flag is set
+ * and old tuple with same primary key is found, TupleFound exception is thrown.
+ * If BOX_REPLACE flags is set and old tple is not found, TupleNotFound
+ * exception is thrown. The return value is an tuple that was actually removed.
+ * This case is usually used for box.replace.
  *
  * 2. old_tuple != NULL, new_tuple == NULL
- * Remove @a old_tuple from the @a space.
+ * Removes @a old_tuple from the @a space. Please note, that @a old_tuple
+ * is removed only if it has same pointer address as an tuple in the @a sp.
+ * The return value is an @a old_tuple if it was found or NULL otherwise.
+ * This case is usually used for box.remove.
  *
  * 3. old_tuple != NULL, new_tuple != NULL
- * Atomically perform operation that equivalent to
- * replace(old_tuple, NULL, flags) + replace(NULL, new_tuple, flags).
+ * Perform atomically an operation that equivalent to
+ * replace(sp, old_tuple, NULL, flags) + replace(sp, NULL, new_tuple, flags).
+ * BOX_ADD flag must be always set, because only one tuple can be removed per
+ * one call. This case is usually used for box.update.
+ *
+ * The method is **atomic** in all cases. Changes are either applied to all
+ * indexes, or nothing applied at all.
  *
  * All possible cases are described in the table:
  * +--------------------------------------------------------------------------+
@@ -109,9 +123,9 @@ static inline i32 space_n(struct space *sp) { return sp->no; }
  * | NULL |  XX  |      |  XX  | 1 | 0 |                      | TupleFound    |
  * | NULL |  XX  |      | NULL | 0 | 1 |                      | TupleNotFound |
  * +------+------+------+------+---+---+----------------------+---------------+
- * |  XX  | NULL |  XX  |      | 0 |0,1| r(oldf)              | oldf          |
- * |  XX  | NULL | NULL |      | 0 | 0 |                      | NULL          |
- * |  XX  | NULL | NULL |      | 0 | 1 |                      | TupleNotFound |
+ * |  XX  | NULL |  XX  |      | * |0,1| r(oldf)              | oldf          |
+ * |  XX  | NULL | NULL |      | * | 0 |                      | NULL          |
+ * |  XX  | NULL | NULL |      | * | 1 |                      | TupleNotFound |
  * +------+------+------+------+---+---+----------------------+---------------+
  * |  XX  |  XX  |  XX  |!=oldf| 1 | 0 |                      | TupleFound    |
  * |  XX  |  XX  |  XX  |==oldf| 1 | 0 | r(oldf), i(new)      | oldf          |
@@ -122,13 +136,10 @@ static inline i32 space_n(struct space *sp) { return sp->no; }
  * oldf = findByTuple(old), newf = findByTuple(new), i = insert, r = remove,
  * A - BOX_ADD, R = BOX_REPLACE (in @a flags parameter).
  *
- * The operation is **atomic**, that is, all changes are either applied
- * consistently to all space's indexes or aren't applied at all.
- *
  * @param sp space
- * @param old_tuple tuple that should be removed (can be NULL)
- * @param new_tuple tuple that should be inserted (can be NULL)
- * @param flags
+ * @param old_tuple the tuple that should be removed (can be NULL)
+ * @param new_tuple the tuple that should be inserted (can be NULL)
+ * @param flags BOX_ADD and BOX_REPLACE flags, as defined in @a request.h
  * @return tuple that actually has been removed from the space
  */
 struct tuple *
diff --git a/src/box/space.m b/src/box/space.m
index f314970320..3b8f82e655 100644
--- a/src/box/space.m
+++ b/src/box/space.m
@@ -144,7 +144,7 @@ space_replace(struct space *sp, struct tuple *old_tuple,
 			return NULL;
 		}
 
-		/* Update seconday keys */
+		/* Update secondary keys */
 		for (; i < n; i++) {
 			Index *index = sp->index[i];
 			[index replace: removed_tuple: new_tuple: BOX_ADD];
diff --git a/src/box/tree.m b/src/box/tree.m
index bcca54f581..a17eb0156c 100644
--- a/src/box/tree.m
+++ b/src/box/tree.m
@@ -930,7 +930,8 @@ tree_iterator_gt(struct iterator *iterator)
 	return [self unfold: node];
 }
 
-- (void *) findNodeByTuple: (struct tuple *) tuple {
+- (struct tuple *) findByTuple: (struct tuple *) tuple
+{
 	struct key_data *key_data
 		= alloca(sizeof(struct key_data) + _SIZEOF_SPARSE_PARTS(tuple->field_count));
 
@@ -938,12 +939,8 @@ tree_iterator_gt(struct iterator *iterator)
 	key_data->part_count = tuple->field_count;
 	fold_with_sparse_parts(key_def, tuple, key_data->parts);
 
-	return sptree_index_find(&tree, key_data);
-}
+	void *node = sptree_index_find(&tree, key_data);
 
-- (struct tuple *) findByTuple: (struct tuple *) tuple
-{
-	void *node = [self findNodeByTuple: tuple];
 	return [self unfold: node];
 }
 
@@ -959,76 +956,68 @@ tree_iterator_gt(struct iterator *iterator)
 		/* Case #1: replace(new_tuple); */
 
 		void *new_node = node1;
-		void *old_node = node2;
+		void *replaced_node = node2;
 		[self fold: new_node: new_tuple];
-		sptree_index_replace(&tree, new_node, &old_node);
 
-		if (unlikely((flags & BOX_ADD) && old_node &&
-			     key_def->is_unique)) {
-			/* Rollback changes */
-			sptree_index_replace(&tree, old_node, NULL);
+		/* Try to optimistically replace the new_tuple in the space */
+		sptree_index_replace(&tree, new_node, &replaced_node);
+
+		if (unlikely(key_def->is_unique && (flags & BOX_ADD)
+			     && replaced_node != NULL)) {
+			/* BOX_ADD, a tuple with the same key is found */
+			sptree_index_replace(&tree, replaced_node, NULL);
 			tnt_raise(ClientError, :ER_TUPLE_FOUND);
 		}
 
-		if (unlikely((flags & BOX_REPLACE) && !old_node &&
-			     key_def->is_unique)) {
-			/* Rollback changes */
+		if (unlikely(key_def->is_unique && (flags & BOX_REPLACE)
+			     && replaced_node == NULL)) {
+			/* BOX_REPLACE,a tuple with the same key is not found */
 			sptree_index_delete(&tree, new_node, NULL);
 			tnt_raise(ClientError, :ER_TUPLE_NOT_FOUND);
 		}
 
-		/* Return a removed node */
-		return [self unfold: old_node];
+		/* Return the removed node */
+		return [self unfold: replaced_node];
 	} else if (!new_tuple && old_tuple) {
 		/* Case #2: remove(old_tuple) */
 
 		void *old_node  = node1;
-		void *old_node2 = node2;
+		void *removed_node = node2;
 
 		[self fold: old_node: old_tuple];
-		sptree_index_delete(&tree, old_node, &old_node2);
+		sptree_index_delete(&tree, old_node, &removed_node);
+		assert([self unfold: removed_node] == NULL ||
+		       [self unfold: removed_node] == old_tuple);
 
-#if 0		/* TODO: A new undocumented feature */
-		if (unlikely(key_def->is_unique &&
-			     (flags & BOX_REPLACE) && !old_node2)) {
-			/* Rollback changes */
-			sptree_index_replace(&tree, old_node, NULL);
-			tnt_raise(ClientError, :ER_TUPLE_NOT_FOUND);
-		}
-#endif
-		return [self unfold: old_node2];
+		return [self unfold: removed_node];
 	} else /* (old_tuple != NULL && new_tuple != NULL) */ {
 		/* Case #3: remove(old_tuple); insert(new_tuple) */
 
-		/* BOX_ADD is only supported in this case */
 		assert(!key_def->is_unique || (flags & BOX_ADD));
-
 		void *new_node = node1;
-		void *old_node = node2;
+		void *replaced_node = node2;
 
 		[self fold: new_node: new_tuple];
-		sptree_index_replace(&tree, new_node, &old_node);
-		assert(key_def->is_unique || old_node == NULL);
-
-		struct tuple *ret = NULL;
-		if (old_node) {
-			ret = [self unfold: old_node];
-
-			if (unlikely(ret != old_tuple)) {
-				/* Rollback changes */
-				sptree_index_replace(&tree, old_node, NULL);
-				tnt_raise(ClientError, :ER_TUPLE_FOUND);
-			}
-		} else {
-			void *old_node  = node1;
-			void *old_node2 = node2;
-			[self fold: old_node: old_tuple];
-			sptree_index_delete(&tree, old_node, &old_node2);
-
-			ret = [self unfold: old_node2];
+		sptree_index_replace(&tree, new_node, &replaced_node);
+		assert(key_def->is_unique || replaced_node == NULL);
+
+		if (replaced_node) {
+			/* The old_tuple had the same key as the new_tuple and
+			 * it was succesfully removed during replace call */
+			assert(old_tuple == [self unfold: replaced_node]);
+			return old_tuple;
 		}
 
-		return ret;
+		/* Nothing was removed during replace call,
+		 * so remove the old_tuple manually */
+		void *old_node  = node1;
+		void *removed_node = node2;
+		[self fold: old_node: old_tuple];
+		sptree_index_delete(&tree, old_node, &removed_node);
+
+		assert([self unfold: removed_node] == NULL ||
+		       [self unfold: removed_node] == old_tuple);
+		return  [self unfold: removed_node];
 	}
 
 	return NULL;
diff --git a/src/box/txn.m b/src/box/txn.m
index 4349e50cd1..95cb30deb8 100644
--- a/src/box/txn.m
+++ b/src/box/txn.m
@@ -74,8 +74,6 @@ txn_replace(struct txn *txn, struct space *space,
 	 * transaction in rollback().
 	 */
 
-	assert(txn->space == NULL || txn->space == space);
-
 	txn->old_tuple = space_replace(space, old_tuple, new_tuple, flags);
 	txn->new_tuple = new_tuple;
 	txn->space = space;
@@ -94,7 +92,8 @@ txn_commit(struct txn *txn)
 	if (txn->op == 0) /* Nothing to do. */
 		return;
 
-	if (txn->old_tuple || txn->new_tuple) {
+	if (!(txn->txn_flags & BOX_NOT_STORE) &&
+			(txn->old_tuple || txn->new_tuple)) {
 		int64_t lsn = next_lsn(recovery_state);
 		int res = wal_write(recovery_state, lsn, 0,
 				    txn->op, &txn->req);
-- 
GitLab