From 15ccfa206ba8533addc4d8864bf20b4e27fd0def Mon Sep 17 00:00:00 2001
From: Dmitry Simonenko <pmwkaa@gmail.com>
Date: Tue, 3 Mar 2015 19:42:04 +0400
Subject: [PATCH] sophia-integration: implement join (master part)

---
 src/box/recovery.cc      |  3 +++
 src/box/recovery.h       |  1 +
 src/box/replication.cc   | 25 +++++++++++++++----------
 src/box/replication.h    |  3 +--
 src/box/sophia_engine.cc | 16 ++++++++--------
 third_party/sophia       |  2 +-
 6 files changed, 29 insertions(+), 21 deletions(-)

diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index 0cfb9cfeb3..53685c203a 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -181,6 +181,7 @@ recovery_new(const char *snap_dirname, const char *wal_dirname,
 	xdir_create(&r->wal_dir, wal_dirname, XLOG, &r->server_uuid);
 
 	vclock_create(&r->vclock);
+	vclock_create(&r->vclock_join);
 
 	xdir_scan(&r->snap_dir);
 	/**
@@ -373,7 +374,9 @@ recover_snap(struct recovery_state *r)
 
 	say_info("recovering from `%s'", snap->filename);
 	recover_xlog(r, snap);
+
 	/* Replace server vclock using the data from snapshot */
+	vclock_copy(&r->vclock_join, &r->vclock);
 	vclock_copy(&r->vclock, &snap->vclock);
 }
 
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 58289bac9e..1be77446b9 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -79,6 +79,7 @@ struct remote {
 };
 
 struct recovery_state {
+	struct vclock vclock_join;
 	struct vclock vclock;
 	/** The WAL we're currently reading/writing from/to. */
 	struct xlog *current_wal;
diff --git a/src/box/replication.cc b/src/box/replication.cc
index b8c01bd02d..36cb629780 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -45,6 +45,10 @@
 #include "cfg.h"
 #include "trigger.h"
 
+void
+replication_send_row(struct recovery_state *r, void *param,
+                     struct xrow_header *packet);
+
 Relay::Relay(int fd_arg, uint64_t sync_arg)
 {
 	r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"),
@@ -59,10 +63,6 @@ Relay::~Relay()
 	recovery_delete(r);
 }
 
-void
-replication_send_row(struct recovery_state *r, void *param,
-                     struct xrow_header *packet);
-
 static inline void
 relay_set_cord_name(int fd)
 {
@@ -185,6 +185,15 @@ replication_subscribe(int fd, struct xrow_header *packet)
 	cord_cojoin(&cord);
 }
 
+void
+relay_send(Relay *relay, struct xrow_header *packet)
+{
+	packet->sync = relay->sync;
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = xrow_to_iovec(packet, iov);
+	coio_writev(&relay->io, iov, iovcnt, 0);
+}
+
 /** Send a single row to the client. */
 void
 replication_send_row(struct recovery_state *r, void *param,
@@ -201,12 +210,8 @@ replication_send_row(struct recovery_state *r, void *param,
 	 * it not from the same server (i.e. don't send
 	 * replica's own rows back).
 	 */
-	if (packet->server_id == 0 || packet->server_id != r->server_id)  {
-		packet->sync = relay->sync;
-		struct iovec iov[XROW_IOVMAX];
-		int iovcnt = xrow_to_iovec(packet, iov);
-		coio_writev(&relay->io, iov, iovcnt, 0);
-	}
+	if (packet->server_id == 0 || packet->server_id != r->server_id)
+		relay_send(relay, packet);
 	/*
 	 * Update local vclock. During normal operation wal_write()
 	 * updates local vclock. In relay mode we have to update
diff --git a/src/box/replication.h b/src/box/replication.h
index 25cc32b265..72e5b2a9fa 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -55,8 +55,7 @@ void
 replication_subscribe(int fd, struct xrow_header *packet);
 
 void
-replication_send_row(struct recovery_state *r, void *param,
-                     struct xrow_header *packet);
+relay_send(Relay *relay, struct xrow_header *packet);
 
 #endif // TARANTOOL_REPLICATION_H_INCLUDED
 
diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc
index fa69a03069..098b0ac02f 100644
--- a/src/box/sophia_engine.cc
+++ b/src/box/sophia_engine.cc
@@ -168,21 +168,19 @@ sophia_send_row(Relay *relay, uint32_t space_id, char *tuple,
 	body.k_tuple = IPROTO_TUPLE;
 	struct xrow_header row;
 	row.type = IPROTO_INSERT;
-	row.lsn = vclock_inc(&r->vclock, r->server_id);
-	row.server_id = r->server_id;
+	row.lsn = vclock_inc(&r->vclock_join, r->server_id);
+	row.server_id = 0;
 	row.bodycnt = 2;
 	row.body[0].iov_base = &body;
 	row.body[0].iov_len = sizeof(body);
 	row.body[1].iov_base = tuple;
 	row.body[1].iov_len = tuple_size;
-	replication_send_row(r, relay, &row);
+	relay_send(relay, &row);
 }
 
 void
 SophiaEngine::join(Relay *relay)
 {
-	return;
-
 	struct vclock *res = vclockset_last(&relay->r->snap_dir.index);
 	if (res == NULL)
 		tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
@@ -197,7 +195,7 @@ SophiaEngine::join(Relay *relay)
 
 	/* iterate through a list of databases which took a
 	 * part in the snapshot */
-	void *db_cursor = sp_ctl(snapshot, "db_view");
+	void *db_cursor = sp_ctl(snapshot, "db_list");
 	if (db_cursor == NULL)
 		sophia_raise(env);
 	while (sp_get(db_cursor)) {
@@ -205,8 +203,10 @@ SophiaEngine::join(Relay *relay)
 
 		/* get space id */
 		void *dbctl = sp_ctl(db);
-		void *oid = sp_get(dbctl, "id");
-		uint32_t space_id = *(uint32_t*)sp_get(oid, "value", NULL);
+		void *oid = sp_get(dbctl, "name");
+		char *name = (char*)sp_get(oid, "value", NULL);
+		char *pe = NULL;
+		uint32_t space_id = strtoul(name, &pe, 10);
 		sp_destroy(oid);
 
 		/* send database */
diff --git a/third_party/sophia b/third_party/sophia
index dc65353f02..c6d5a1b6b1 160000
--- a/third_party/sophia
+++ b/third_party/sophia
@@ -1 +1 @@
-Subproject commit dc65353f02f023361bc331cb25477b6615e8a3fb
+Subproject commit c6d5a1b6b19adef6a47ff8ee49bbc3a2845b73ae
-- 
GitLab