From d17ed09c375e6096dbb70ffeecac806b94f0f879 Mon Sep 17 00:00:00 2001
From: Alexandr Lyapunov <a.lyapunov@corp.mail.ru>
Date: Wed, 28 Oct 2015 11:28:43 +0300
Subject: [PATCH] fixed gh-1098, probably fixed gh-1084 : duplicate inserts

Fixed a bug in B-tree that could slightly break structure is presence of freezed iterators
---
 src/lib/salad/bps_tree.h  | 156 ++++++++++++++++++++++++++++++++------
 test/unit/bps_tree_itr.cc |   9 +++
 2 files changed, 142 insertions(+), 23 deletions(-)

diff --git a/src/lib/salad/bps_tree.h b/src/lib/salad/bps_tree.h
index 63715710b1..47a6e8e0dc 100644
--- a/src/lib/salad/bps_tree.h
+++ b/src/lib/salad/bps_tree.h
@@ -388,6 +388,8 @@ typedef uint32_t bps_tree_block_id_t;
 #define bps_tree_reserve_blocks _bps_tree(reserve_blocks)
 #define bps_tree_insert_first_elem _bps_tree(insert_first_elem)
 #define bps_tree_collect_path _bps_tree(collect_path)
+#define bps_tree_touch_leaf_path_max_elem _bps_tree(touch_leaf_path_max_elem)
+#define bps_tree_touch_path _bps_tree(touch_path_max_elem)
 #define bps_tree_process_replace _bps_tree(process_replace)
 #define bps_tree_debug_memmove _bps_tree(debug_memmove)
 #define bps_tree_insert_into_leaf _bps_tree(insert_into_leaf)
@@ -926,6 +928,10 @@ struct bps_inner_path_elem {
 	struct bps_inner_path_elem *parent;
 	/* Pointer to the sequent to the max element in the subtree */
 	bps_tree_elem_t *max_elem_copy;
+	/* Holder of max_elem_copy (block_id) */
+	bps_tree_block_id_t max_elem_block_id;
+	/* Holder of max_elem_copy (pos) */
+	bps_tree_pos_t max_elem_pos;
 };
 
 /**
@@ -946,6 +952,10 @@ struct bps_leaf_path_elem {
 	bps_inner_path_elem *parent;
 	/* A pointer to the sequent to the max element in the subtree */
 	bps_tree_elem_t *max_elem_copy;
+	/* Holder of max_elem_copy (block_id) */
+	bps_tree_block_id_t max_elem_block_id;
+	/* Holder of max_elem_copy (pos) */
+	bps_tree_pos_t max_elem_pos;
 };
 
 /**
@@ -1911,6 +1921,8 @@ bps_tree_collect_path(struct bps_tree *tree, bps_tree_elem_t new_elem,
 	struct bps_block *block = bps_tree_root(tree);
 	bps_tree_block_id_t block_id = tree->root_id;
 	bps_tree_elem_t *max_elem_copy = &tree->max_elem;
+	bps_tree_block_id_t max_elem_block_id = (bps_tree_block_id_t)-1;
+	bps_tree_pos_t max_elem_pos = (bps_tree_pos_t)-1;
 	for (bps_tree_block_id_t i = 0; i < tree->depth - 1; i++) {
 		struct bps_inner *inner = (struct bps_inner *)block;
 		bps_tree_pos_t pos;
@@ -1927,9 +1939,14 @@ bps_tree_collect_path(struct bps_tree *tree, bps_tree_elem_t new_elem,
 		path[i].pos_in_parent = prev_pos;
 		path[i].parent = prev_ext;
 		path[i].max_elem_copy = max_elem_copy;
+		path[i].max_elem_block_id = max_elem_block_id;
+		path[i].max_elem_pos = max_elem_pos;
 
-		if (pos < inner->header.size - 1)
+		if (pos < inner->header.size - 1) {
 			max_elem_copy = inner->elems + pos;
+			max_elem_block_id = block_id;
+			max_elem_pos = pos;
+		}
 		block_id = inner->child_ids[pos];
 		block = bps_tree_restore_block(tree, block_id);
 		prev_pos = pos;
@@ -1951,6 +1968,43 @@ bps_tree_collect_path(struct bps_tree *tree, bps_tree_elem_t new_elem,
 	leaf_path_elem->pos_in_parent = prev_pos;
 	leaf_path_elem->parent = prev_ext;
 	leaf_path_elem->max_elem_copy = max_elem_copy;
+	leaf_path_elem->max_elem_block_id = max_elem_block_id;
+	leaf_path_elem->max_elem_pos = max_elem_pos;
+}
+
+/**
+ * @brief Get new COW link to max_elem_copy member of leaf path
+ */
+static inline void
+bps_tree_touch_leaf_path_max_elem(struct bps_tree *tree,
+				  struct bps_leaf_path_elem *leaf_path_elem)
+{
+	if (leaf_path_elem->max_elem_block_id == (bps_tree_block_id_t)-1)
+		return;
+	struct bps_inner *holder = (struct bps_inner *)
+		bps_tree_touch_block(tree, leaf_path_elem->max_elem_block_id);
+	leaf_path_elem->max_elem_copy =
+		holder->elems + leaf_path_elem->max_elem_pos;
+}
+
+/**
+ * @brief Get new COW link to blocks and max_elem_copys
+ */
+static inline void
+bps_tree_touch_path(struct bps_tree *tree,
+			     struct bps_leaf_path_elem *leaf_path_elem)
+{
+	bps_tree_touch_leaf_path_max_elem(tree, leaf_path_elem);
+	for (bps_inner_path_elem *path = leaf_path_elem->parent;
+	     path; path = path->parent) {
+		path->block = (struct bps_inner *)
+			bps_tree_touch_block(tree, path->block_id);
+		if (path->max_elem_block_id == (bps_tree_block_id_t)-1)
+			continue;
+		struct bps_inner *holder = (struct bps_inner *)
+			bps_tree_touch_block(tree, path->max_elem_block_id);
+		path->max_elem_copy = holder->elems + path->max_elem_pos;
+	}
 }
 
 /**
@@ -1970,7 +2024,11 @@ bps_tree_process_replace(struct bps_tree *tree,
 		*replaced = leaf->elems[leaf_path_elem->insertion_point];
 
 	leaf->elems[leaf_path_elem->insertion_point] = new_elem;
-	*leaf_path_elem->max_elem_copy = leaf->elems[leaf->header.size - 1];
+	if (leaf_path_elem->insertion_point == leaf->header.size - 1) {
+		bps_tree_touch_leaf_path_max_elem(tree, leaf_path_elem);
+		*leaf_path_elem->max_elem_copy =
+			leaf->elems[leaf->header.size - 1];
+	}
 	return true;
 }
 
@@ -2093,7 +2151,11 @@ bps_tree_insert_into_leaf(struct bps_tree *tree,
 	BPS_TREE_DATAMOVE(leaf->elems + pos + 1, leaf->elems + pos,
 			  leaf->header.size - pos, leaf, leaf);
 	leaf->elems[pos] = new_elem;
-	*leaf_path_elem->max_elem_copy = leaf->elems[leaf->header.size];
+
+	if (pos == leaf->header.size) {
+		bps_tree_touch_leaf_path_max_elem(tree, leaf_path_elem);
+		*leaf_path_elem->max_elem_copy = leaf->elems[leaf->header.size];
+	}
 	leaf->header.size++;
 	tree->size++;
 }
@@ -2156,9 +2218,11 @@ bps_tree_delete_from_leaf(struct bps_tree *tree,
 
 	leaf->header.size--;
 
-	if (leaf->header.size > 0)
+	if (leaf->header.size > 0 && pos == leaf->header.size) {
+		bps_tree_touch_leaf_path_max_elem(tree, leaf_path_elem);
 		*leaf_path_elem->max_elem_copy =
 			leaf->elems[leaf->header.size - 1];
+	}
 
 	tree->size--;
 }
@@ -2956,10 +3020,12 @@ bps_tree_process_insert_leaf(struct bps_tree *tree,
 		BPS_TREE_BRANCH_TRACE(tree, insert_leaf, 1 << 0x0);
 		return 0;
 	}
-	struct bps_leaf_path_elem left_ext = {0, 0, 0, 0, 0, 0},
-			right_ext = {0, 0, 0, 0, 0, 0},
-			left_left_ext = {0, 0, 0, 0, 0, 0},
-			right_right_ext = {0, 0, 0, 0, 0, 0};
+	bps_tree_touch_path(tree, leaf_path_elem);
+
+	struct bps_leaf_path_elem left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+			right_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+			left_left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+			right_right_ext = {0, 0, 0, 0, 0, 0, 0, 0};
 	bool has_left_ext =
 		bps_tree_collect_left_path_elem_leaf(tree, leaf_path_elem,
 						     &left_ext);
@@ -3284,10 +3350,10 @@ bps_tree_process_insert_inner(struct bps_tree *tree,
 		BPS_TREE_BRANCH_TRACE(tree, insert_inner, 1 << 0x0);
 		return 0;
 	}
-	bps_inner_path_elem left_ext = {0, 0, 0, 0, 0, 0},
-		right_ext = {0, 0, 0, 0, 0, 0},
-		left_left_ext = {0, 0, 0, 0, 0, 0},
-		right_right_ext = {0, 0, 0, 0, 0, 0};
+	bps_inner_path_elem left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		right_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		left_left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		right_right_ext = {0, 0, 0, 0, 0, 0, 0, 0};
 	bool has_left_ext =
 		bps_tree_collect_left_path_elem_inner(tree, inner_path_elem,
 						      &left_ext);
@@ -3606,10 +3672,12 @@ bps_tree_process_delete_leaf(struct bps_tree *tree,
 		return;
 	}
 
-	struct bps_leaf_path_elem left_ext = {0, 0, 0, 0, 0, 0},
-		right_ext = {0, 0, 0, 0, 0, 0},
-		left_left_ext = {0, 0, 0, 0, 0, 0},
-		right_right_ext = {0, 0, 0, 0, 0, 0};
+	bps_tree_touch_path(tree, leaf_path_elem);
+
+	struct bps_leaf_path_elem left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		right_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		left_left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		right_right_ext = {0, 0, 0, 0, 0, 0, 0, 0};
 	bool has_left_ext =
 		bps_tree_collect_left_path_elem_leaf(tree, leaf_path_elem,
 						     &left_ext);
@@ -3758,14 +3826,14 @@ bps_tree_process_delete_leaf(struct bps_tree *tree,
 		tree->first_id = leaf->next_id;
 	} else {
 		struct bps_leaf *prev_block = (struct bps_leaf *)
-			bps_tree_restore_block(tree, leaf->prev_id);
+			bps_tree_touch_block(tree, leaf->prev_id);
 		prev_block->next_id = leaf->next_id;
 	}
 	if (leaf->next_id == (bps_tree_block_id_t)(-1)) {
 		tree->last_id = leaf->prev_id;
 	} else {
 		struct bps_leaf *next_block = (struct bps_leaf *)
-			bps_tree_restore_block(tree, leaf->next_id);
+			bps_tree_touch_block(tree, leaf->next_id);
 		next_block->prev_id = leaf->prev_id;
 	}
 
@@ -3792,10 +3860,10 @@ bps_tree_process_delete_inner(struct bps_tree *tree,
 		return;
 	}
 
-	bps_inner_path_elem left_ext = {0, 0, 0, 0, 0, 0},
-		right_ext = {0, 0, 0, 0, 0, 0},
-		left_left_ext = {0, 0, 0, 0, 0, 0},
-		right_right_ext = {0, 0, 0, 0, 0, 0};
+	bps_inner_path_elem left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		right_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		left_left_ext = {0, 0, 0, 0, 0, 0, 0, 0},
+		right_right_ext = {0, 0, 0, 0, 0, 0, 0, 0};
 	bool has_left_ext =
 		bps_tree_collect_left_path_elem_inner(tree, inner_path_elem,
 						      &left_ext);
@@ -4361,12 +4429,14 @@ bps_tree_debug_check_insert_into_leaf(struct bps_tree *tree, bool assertme)
 			struct bps_leaf_path_elem path_elem;
 			bps_tree_elem_t max;
 			bps_tree_elem_t ins;
-			bps_tree_debug_set_elem(&max, i + 1);
+			bps_tree_debug_set_elem(&max, i);
 			bps_tree_debug_set_elem(&ins, j);
 			path_elem.block = &block;
 			path_elem.block_id = 0;
 			path_elem.insertion_point = j;
 			path_elem.max_elem_copy = &max;
+			path_elem.max_elem_block_id = -1;
+			path_elem.max_elem_pos = -1;
 
 			bps_tree_insert_into_leaf(tree, &path_elem, ins);
 
@@ -4420,6 +4490,8 @@ bps_tree_debug_check_delete_from_leaf(struct bps_tree *tree, bool assertme)
 			path_elem.block_id = 0;
 			path_elem.insertion_point = j;
 			path_elem.max_elem_copy = &max;
+			path_elem.max_elem_block_id = -1;
+			path_elem.max_elem_pos = -1;
 
 			bps_tree_delete_from_leaf(tree, &path_elem);
 
@@ -4490,8 +4562,12 @@ bps_tree_debug_check_move_to_right_leaf(struct bps_tree *tree, bool assertme)
 					b_path_elem;
 				a_path_elem.block = &a;
 				a_path_elem.max_elem_copy = &ma;
+				a_path_elem.max_elem_block_id = -1;
+				a_path_elem.max_elem_pos = -1;
 				b_path_elem.block = &b;
 				b_path_elem.max_elem_copy = &mb;
+				b_path_elem.max_elem_block_id = -1;
+				b_path_elem.max_elem_pos = -1;
 				a_path_elem.block_id = 0;
 				b_path_elem.block_id = 0;
 
@@ -4583,8 +4659,12 @@ bps_tree_debug_check_move_to_left_leaf(struct bps_tree *tree, bool assertme)
 					b_path_elem;
 				a_path_elem.block = &a;
 				a_path_elem.max_elem_copy = &ma;
+				a_path_elem.max_elem_block_id = -1;
+				a_path_elem.max_elem_pos = -1;
 				b_path_elem.block = &b;
 				b_path_elem.max_elem_copy = &mb;
+				b_path_elem.max_elem_block_id = -1;
+				b_path_elem.max_elem_pos = -1;
 				a_path_elem.block_id = 0;
 				b_path_elem.block_id = 0;
 
@@ -4684,8 +4764,12 @@ bps_tree_debug_check_insert_and_move_to_right_leaf(struct bps_tree *tree,
 						b_path_elem;
 					a_path_elem.block = &a;
 					a_path_elem.max_elem_copy = &ma;
+					a_path_elem.max_elem_block_id = -1;
+					a_path_elem.max_elem_pos = -1;
 					b_path_elem.block = &b;
 					b_path_elem.max_elem_copy = &mb;
+					b_path_elem.max_elem_block_id = -1;
+					b_path_elem.max_elem_pos = -1;
 					a_path_elem.insertion_point = k;
 					a_path_elem.block_id = 0;
 					b_path_elem.block_id = 0;
@@ -4796,8 +4880,12 @@ bps_tree_debug_check_insert_and_move_to_left_leaf(struct bps_tree *tree,
 						b_path_elem;
 					a_path_elem.block = &a;
 					a_path_elem.max_elem_copy = &ma;
+					a_path_elem.max_elem_block_id = -1;
+					a_path_elem.max_elem_pos = -1;
 					b_path_elem.block = &b;
 					b_path_elem.max_elem_copy = &mb;
+					b_path_elem.max_elem_block_id = -1;
+					b_path_elem.max_elem_pos = -1;
 					b_path_elem.insertion_point = k;
 					a_path_elem.block_id = 0;
 					b_path_elem.block_id = 0;
@@ -4888,6 +4976,8 @@ bps_tree_debug_check_insert_into_inner(struct bps_tree *tree, bool assertme)
 			path_elem.block = &block;
 			path_elem.block_id = 0;
 			path_elem.max_elem_copy = &max;
+			path_elem.max_elem_block_id = -1;
+			path_elem.max_elem_pos = -1;
 
 			for (unsigned int k = 0; k < i; k++) {
 				if (k < j)
@@ -4954,6 +5044,8 @@ bps_tree_debug_check_delete_from_inner(struct bps_tree *tree, bool assertme)
 			path_elem.block_id = 0;
 			path_elem.insertion_point = j;
 			path_elem.max_elem_copy = &max;
+			path_elem.max_elem_block_id = -1;
+			path_elem.max_elem_pos = -1;
 
 			bps_tree_delete_from_inner(tree, &path_elem);
 
@@ -5012,8 +5104,12 @@ bps_tree_debug_check_move_to_right_inner(struct bps_tree *tree, bool assertme)
 				bps_inner_path_elem a_path_elem, b_path_elem;
 				a_path_elem.block = &a;
 				a_path_elem.max_elem_copy = &ma;
+				a_path_elem.max_elem_block_id = -1;
+				a_path_elem.max_elem_pos = -1;
 				b_path_elem.block = &b;
 				b_path_elem.max_elem_copy = &mb;
+				b_path_elem.max_elem_block_id = -1;
+				b_path_elem.max_elem_pos = -1;
 				a_path_elem.block_id = 0;
 				b_path_elem.block_id = 0;
 
@@ -5108,8 +5204,12 @@ bps_tree_debug_check_move_to_left_inner(struct bps_tree *tree, bool assertme)
 				bps_inner_path_elem a_path_elem, b_path_elem;
 				a_path_elem.block = &a;
 				a_path_elem.max_elem_copy = &ma;
+				a_path_elem.max_elem_block_id = -1;
+				a_path_elem.max_elem_pos = -1;
 				b_path_elem.block = &b;
 				b_path_elem.max_elem_copy = &mb;
+				b_path_elem.max_elem_block_id = -1;
+				b_path_elem.max_elem_pos = -1;
 				a_path_elem.block_id = 0;
 				b_path_elem.block_id = 0;
 
@@ -5210,8 +5310,12 @@ bps_tree_debug_check_insert_and_move_to_right_inner(struct bps_tree *tree,
 						b_path_elem;
 					a_path_elem.block = &a;
 					a_path_elem.max_elem_copy = &ma;
+					a_path_elem.max_elem_block_id = -1;
+					a_path_elem.max_elem_pos = -1;
 					b_path_elem.block = &b;
 					b_path_elem.max_elem_copy = &mb;
+					b_path_elem.max_elem_block_id = -1;
+					b_path_elem.max_elem_pos = -1;
 					a_path_elem.block_id = 0;
 					b_path_elem.block_id = 0;
 
@@ -5338,8 +5442,12 @@ bps_tree_debug_check_insert_and_move_to_left_inner(struct bps_tree *tree,
 						b_path_elem;
 					a_path_elem.block = &a;
 					a_path_elem.max_elem_copy = &ma;
+					a_path_elem.max_elem_block_id = -1;
+					a_path_elem.max_elem_pos = -1;
 					b_path_elem.block = &b;
 					b_path_elem.max_elem_copy = &mb;
+					b_path_elem.max_elem_block_id = -1;
+					b_path_elem.max_elem_pos = -1;
 					a_path_elem.block_id = 0;
 					b_path_elem.block_id = 0;
 
@@ -5529,6 +5637,8 @@ bps_tree_debug_check_internal_functions(bool assertme)
 #undef bps_tree_reserve_blocks
 #undef bps_tree_insert_first_elem
 #undef bps_tree_collect_path
+#undef bps_tree_touch_leaf_path_max_elem
+#undef bps_tree_touch_path
 #undef bps_tree_process_replace
 #undef bps_tree_debug_memmove
 #undef bps_tree_insert_into_leaf
diff --git a/test/unit/bps_tree_itr.cc b/test/unit/bps_tree_itr.cc
index 0c7deadf40..1e18b599bd 100644
--- a/test/unit/bps_tree_itr.cc
+++ b/test/unit/bps_tree_itr.cc
@@ -416,6 +416,9 @@ itr_freeze_check()
 			e.first = rand() % test_data_mod;
 			e.second = 0;
 			bps_tree_test_insert(&tree, e, 0);
+			int check = bps_tree_test_debug_check(&tree);
+			fail_if(check);
+			assert(check == 0);
 		}
 		struct bps_tree_test_iterator itr = bps_tree_test_itr_first(&tree);
 		elem_t *e;
@@ -432,6 +435,9 @@ itr_freeze_check()
 			e.first = rand() % test_data_mod;
 			e.second = 0;
 			bps_tree_test_insert(&tree, e, 0);
+			int check = bps_tree_test_debug_check(&tree);
+			fail_if(check);
+			assert(check == 0);
 		}
 		int tested_count = 0;
 		while ((e = bps_tree_test_itr_get_elem(&tree, &itr1))) {
@@ -450,6 +456,9 @@ itr_freeze_check()
 			e.first = rand() % test_data_mod;
 			e.second = 0;
 			bps_tree_test_delete(&tree, e);
+			int check = bps_tree_test_debug_check(&tree);
+			fail_if(check);
+			assert(check == 0);
 		}
 
 		tested_count = 0;
-- 
GitLab