From 3f5fe7913a9dfc6d09306e9b1c160e9a602e03b8 Mon Sep 17 00:00:00 2001
From: Dmitry Simonenko <pmwkaa@gmail.com>
Date: Fri, 27 Feb 2015 19:15:31 +0400
Subject: [PATCH] sophia-integration: join implementation (first part)

---
 src/box/engine.cc        |  9 +++++
 src/box/engine.h         |  8 +++++
 src/box/memtx_engine.cc  |  6 ++++
 src/box/memtx_engine.h   |  1 +
 src/box/replication.cc   | 12 ++++---
 src/box/replication.h    |  4 +++
 src/box/sophia_engine.cc | 73 ++++++++++++++++++++++++++++++++++++++++
 src/box/sophia_engine.h  |  1 +
 third_party/sophia       |  2 +-
 9 files changed, 110 insertions(+), 6 deletions(-)

diff --git a/src/box/engine.cc b/src/box/engine.cc
index cd3d53fcbf..398cd3c70d 100644
--- a/src/box/engine.cc
+++ b/src/box/engine.cc
@@ -179,3 +179,12 @@ engine_checkpoint(int64_t checkpoint_id)
 	snapshot_is_in_progress = false;
 	return save_errno;
 }
+
+void
+engine_join(struct recovery_state *r)
+{
+	Engine *engine;
+	engine_foreach(engine) {
+		engine->join(r);
+	}
+}
diff --git a/src/box/engine.h b/src/box/engine.h
index 0498e40e27..2af199922b 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -118,6 +118,8 @@ class Engine: public Object {
 	 * Delete all tuples in the index on drop.
 	 */
 	virtual void dropIndex(Index*) = 0;
+
+	virtual void join(struct recovery_state*) = 0;
 	/**
 	 * Engine specific transaction life-cycle routines.
 	 */
@@ -254,4 +256,10 @@ engine_end_recovery();
 int
 engine_checkpoint(int64_t checkpoint_id);
 
+/**
+ * Send a snapshot.
+ */
+void
+engine_join(struct recovery_state*);
+
 #endif /* TARANTOOL_BOX_ENGINE_H_INCLUDED */
diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc
index b64baafd65..0c3887196c 100644
--- a/src/box/memtx_engine.cc
+++ b/src/box/memtx_engine.cc
@@ -111,6 +111,12 @@ MemtxEngine::end_recovery()
 	recovery.recover = space_build_all_keys;
 }
 
+void
+MemtxEngine::join(struct recovery_state *r)
+{
+	recover_snap(r);
+}
+
 Handler *MemtxEngine::open()
 {
 	return new MemtxSpace(this);
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index 127633f526..38c5c88ad3 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -40,6 +40,7 @@ struct MemtxEngine: public Engine {
 	virtual void begin_recover_snapshot(int64_t lsn);
 	virtual void end_recover_snapshot();
 	virtual void end_recovery();
+	virtual void join(struct recovery_state*);
 	virtual int begin_checkpoint(int64_t);
 	virtual int wait_checkpoint();
 	virtual void commit_checkpoint();
diff --git a/src/box/replication.cc b/src/box/replication.cc
index d292c432cb..667b5b1e8b 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -34,6 +34,7 @@
 #include "xlog.h"
 #include "evio.h"
 #include "iproto_constants.h"
+#include "box/engine.h"
 #include "box/cluster.h"
 #include "box/schema.h"
 #include "box/vclock.h"
@@ -44,9 +45,9 @@
 #include "cfg.h"
 #include "trigger.h"
 
-static void
+void
 replication_send_row(struct recovery_state *r, void *param,
-		     struct xrow_header *packet);
+                     struct xrow_header *packet);
 
 /** State of a replication relay. */
 class Relay {
@@ -90,8 +91,9 @@ replication_join_f(va_list ap)
 	struct recovery_state *r = relay->r;
 
 	relay_set_cord_name(relay->io.fd);
+
 	/* Send snapshot */
-	recover_snap(r);
+	engine_join(r);
 
 	/* Send response to JOIN command = end of stream */
 	struct xrow_header row;
@@ -193,9 +195,9 @@ replication_subscribe(int fd, struct xrow_header *packet)
 }
 
 /** Send a single row to the client. */
-static void
+void
 replication_send_row(struct recovery_state *r, void *param,
-		     struct xrow_header *packet)
+                     struct xrow_header *packet)
 {
 	Relay *relay = (Relay *) param;
 	assert(iproto_type_is_dml(packet->type));
diff --git a/src/box/replication.h b/src/box/replication.h
index d46fa04117..354553186d 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -41,5 +41,9 @@ replication_join(int fd, struct xrow_header *packet);
 void
 replication_subscribe(int fd, struct xrow_header *packet);
 
+void
+replication_send_row(struct recovery_state *r, void *param,
+                     struct xrow_header *packet);
+
 #endif // TARANTOOL_REPLICATION_H_INCLUDED
 
diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc
index e1e0e41b69..063601241c 100644
--- a/src/box/sophia_engine.cc
+++ b/src/box/sophia_engine.cc
@@ -34,7 +34,11 @@
 #include "txn.h"
 #include "index.h"
 #include "sophia_index.h"
+#include "recovery.h"
 #include "space.h"
+#include "request.h"
+#include "iproto_constants.h"
+#include "replication.h"
 #include "salad/rlist.h"
 #include <sophia.h>
 #include <stdlib.h>
@@ -152,6 +156,75 @@ SophiaEngine::end_recover_snapshot()
 	recovery.recover = sophia_recovery_end_snapshot;
 }
 
+static inline void
+sophia_send_row(struct recovery_state *r, char *tuple,
+                uint32_t tuple_size)
+{
+	struct request req;
+	request_create(&req, IPROTO_REPLACE);
+	req.space_id  = 0;
+	req.index_id  = 0;
+	req.tuple     = tuple;
+	req.tuple_end = tuple + tuple_size;
+
+	struct xrow_header row;
+	row.type = IPROTO_REPLACE;
+	row.lsn = 0;
+	row.server_id = 0;
+	row.bodycnt = request_encode(&req, row.body);
+
+	replication_send_row(r, /* Relay* */ NULL, &row);
+}
+
+void
+SophiaEngine::join(struct recovery_state *r)
+{
+
+	return;
+
+	struct vclock *res = vclockset_last(&r->snap_dir.index);
+	if (res == NULL)
+		tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
+	int64_t signt = vclock_signature(res);
+
+	/* get snapshot object */
+	char id[128];
+	snprintf(id, sizeof(id), "snapshot.%" PRIu64, signt);
+	void *c = sp_ctl(env);
+	void *snapshot = sp_get(c, id);
+	assert(snapshot != NULL);
+
+	/* iterate through a list of databases which took a
+	 * part in the snapshot */
+	void *db_cursor = sp_ctl(snapshot, "db_view");
+	if (db_cursor == NULL)
+		sophia_raise(env);
+	while (sp_get(db_cursor)) {
+		void *db = sp_object(db_cursor);
+		/* send database */
+		void *o = sp_object(db);
+		void *cursor = sp_cursor(snapshot, o);
+		if (cursor == NULL) {
+			sp_destroy(db_cursor);
+			sophia_raise(env);
+		}
+		while (sp_get(cursor)) {
+			o = sp_object(cursor);
+			uint32_t tuple_size = 0;
+			char *tuple = (char *)sp_get(o, "value", &tuple_size);
+			try {
+				sophia_send_row(r, tuple, tuple_size);
+			} catch (...) {
+				sp_destroy(cursor);
+				sp_destroy(db_cursor);
+				throw;
+			}
+		}
+		sp_destroy(cursor);
+	}
+	sp_destroy(db_cursor);
+}
+
 static inline void
 sophia_snapshot_recover(void *env, int64_t lsn);
 
diff --git a/src/box/sophia_engine.h b/src/box/sophia_engine.h
index ca037d4e7a..7c1c8dd6e4 100644
--- a/src/box/sophia_engine.h
+++ b/src/box/sophia_engine.h
@@ -43,6 +43,7 @@ struct SophiaEngine: public Engine {
 	virtual void begin_recover_snapshot(int64_t);
 	virtual void end_recover_snapshot();
 	virtual void end_recovery();
+	virtual void join(struct recovery_state*);
 	virtual int begin_checkpoint(int64_t);
 	virtual int wait_checkpoint();
 	virtual void commit_checkpoint();
diff --git a/third_party/sophia b/third_party/sophia
index 6b0e6e7a4a..dc65353f02 160000
--- a/third_party/sophia
+++ b/third_party/sophia
@@ -1 +1 @@
-Subproject commit 6b0e6e7a4a25aafb5eb2031c4ae9a810944975e3
+Subproject commit dc65353f02f023361bc331cb25477b6615e8a3fb
-- 
GitLab