From e35a8809dd702ce82d83d3f85fbd049ae18fc9c7 Mon Sep 17 00:00:00 2001
From: Aleksey Demakov <ademakov@gmail.com>
Date: Wed, 18 Jul 2012 16:52:51 +0400
Subject: [PATCH] Streamline row insertion for recovery from snapshot.

---
 mod/box/box.m   | 51 +++++++++++++++++++++++++------------------------
 mod/box/index.h |  1 +
 mod/box/index.m | 11 +++++++++++
 mod/box/tree.h  |  1 -
 mod/box/tree.m  | 13 -------------
 5 files changed, 38 insertions(+), 39 deletions(-)

diff --git a/mod/box/box.m b/mod/box/box.m
index f57713e72f..fc0763735e 100644
--- a/mod/box/box.m
+++ b/mod/box/box.m
@@ -250,23 +250,6 @@ xlog_print(struct tbuf *t)
 	return 0;
 }
 
-static struct tbuf *
-convert_snap_row_to_wal(struct tbuf *t)
-{
-	struct tbuf *r = tbuf_alloc(fiber->gc_pool);
-	struct box_snap_row *row = box_snap_row(t);
-	u16 op = REPLACE;
-	u32 flags = 0;
-
-	tbuf_append(r, &op, sizeof(op));
-	tbuf_append(r, &row->space, sizeof(row->space));
-	tbuf_append(r, &flags, sizeof(flags));
-	tbuf_append(r, &row->tuple_size, sizeof(row->tuple_size));
-	tbuf_append(r, row->data, row->data_size);
-
-	return r;
-}
-
 static void
 recovery_phase_1(void)
 {
@@ -284,6 +267,27 @@ recovery_phase_2(void)
 	end_build_primary_indexes();
 }
 
+static void
+recover_snap_row(struct tbuf *t)
+{
+	struct box_snap_row *row = box_snap_row(t);
+
+	struct tuple *tuple = tuple_alloc(row->data_size);
+	memcpy(tuple->data, row->data, row->data_size);
+	tuple->field_count = row->tuple_size;
+	tuple_ref(tuple, 1);
+
+	@try {
+		struct space *space = space_find(row->space);
+		Index *index = space->index[0];
+		[index buildNext: tuple];
+	}
+	@catch(...) {
+		tuple_ref(tuple, -1);
+		@throw;
+	}
+}
+
 static int
 recover_row(struct tbuf *t)
 {
@@ -296,22 +300,19 @@ recover_row(struct tbuf *t)
 		read_u64(t); /* drop cookie */
 		if (tag == SNAP) {
 			assert(recovery_phase == RECOVERY_PHASE_1);
-			t = convert_snap_row_to_wal(t);
+			recover_snap_row(t);
 		} else if (tag == XLOG) {
 			if (recovery_phase == RECOVERY_PHASE_1) {
 				recovery_phase_2();
 			}
+			u16 op = read_u16(t);
+			struct txn *txn = txn_begin();
+			txn->txn_flags |= BOX_NOT_STORE;
+			box_process_rw(txn, port_null, op, t);
 		} else {
 			say_error("unknown row tag: %i", (int)tag);
 			return -1;
 		}
-
-		u16 op = read_u16(t);
-
-		struct txn *txn = txn_begin();
-		txn->txn_flags |= BOX_NOT_STORE;
-
-		box_process_rw(txn, port_null, op, t);
 	}
 	@catch (id e) {
 		return -1;
diff --git a/mod/box/index.h b/mod/box/index.h
index 8ccddcac43..f58579d5ce 100644
--- a/mod/box/index.h
+++ b/mod/box/index.h
@@ -111,6 +111,7 @@ struct key_def {
  * Finish index construction.
  */
 - (void) buildBegin;
+- (void) buildNext: (struct tuple *)tuple;
 - (void) buildEnd;
 - (void) build: (Index *) pk;
 - (size_t) size;
diff --git a/mod/box/index.m b/mod/box/index.m
index 33e33429eb..38f252c6da 100644
--- a/mod/box/index.m
+++ b/mod/box/index.m
@@ -127,6 +127,12 @@ iterator_first_equal(struct iterator *it)
 	[self subclassResponsibility: _cmd];
 }
 
+- (void) buildNext: (struct tuple *)tuple
+{
+	(void) tuple;
+	[self subclassResponsibility: _cmd];
+}
+
 - (void) buildEnd
 {
 	[self subclassResponsibility: _cmd];
@@ -284,6 +290,11 @@ hash_iterator_free(struct iterator *iterator)
 {
 }
 
+- (void) buildNext: (struct tuple *)tuple
+{
+	[self replace: NULL :tuple];
+}
+
 - (void) buildEnd
 {
 }
diff --git a/mod/box/tree.h b/mod/box/tree.h
index 32189fad39..c0e88c763f 100644
--- a/mod/box/tree.h
+++ b/mod/box/tree.h
@@ -39,7 +39,6 @@ typedef int (*tree_cmp_t)(const void *, const void *, void *);
 @interface TreeIndex: Index {
 @public
 	sptree_index tree;
-	bool building;
 };
 
 + (Index *) alloc: (struct key_def *) key_def :(struct space *) space;
diff --git a/mod/box/tree.m b/mod/box/tree.m
index 6955e25266..c313bcfd7a 100644
--- a/mod/box/tree.m
+++ b/mod/box/tree.m
@@ -861,7 +861,6 @@ tree_iterator_free(struct iterator *iterator)
 	self = [super init: key_def_arg :space_arg];
 	if (self) {
 		memset(&tree, 0, sizeof tree);
-		building = false;
 	}
 	return self;
 }
@@ -899,9 +898,6 @@ tree_iterator_free(struct iterator *iterator)
 
 - (struct tuple *) findByTuple: (struct tuple *) tuple
 {
-	if (building)
-		return NULL;
-
 	struct key_data *key_data
 		= alloca(sizeof(struct key_data) + _SIZEOF_SPARSE_PARTS(tuple->field_count));
 
@@ -927,12 +923,6 @@ tree_iterator_free(struct iterator *iterator)
 		tnt_raise(ClientError, :ER_NO_SUCH_FIELD,
 			  key_def->max_fieldno);
 
-	if (building) {
-		assert(old_tuple == NULL);
-		[self buildNext: new_tuple];
-		return;
-	}
-
 	void *node = alloca([self node_size]);
 	if (old_tuple) {
 		[self fold: node :old_tuple];
@@ -986,8 +976,6 @@ tree_iterator_free(struct iterator *iterator)
 {
 	assert(index_is_primary(self));
 
-	building = true;
-
 	tree.size = 0;
 	tree.max_size = 64;
 
@@ -1026,7 +1014,6 @@ tree_iterator_free(struct iterator *iterator)
 	u32 estimated_tuples = tree.max_size;
 	void *nodes = tree.members;
 
-	building = false;
 	sptree_index_init(&tree,
 			  [self node_size], nodes, n_tuples, estimated_tuples,
 			  [self key_node_cmp], [self node_cmp],
-- 
GitLab