From 833178709142eafe2f59a6cfbfa6582946ae3108 Mon Sep 17 00:00:00 2001
From: Dmitry Simonenko <pmwkaa@gmail.com>
Date: Wed, 17 Dec 2014 20:24:46 +0400
Subject: [PATCH] sophia-snapshot: gh-556, gh-661

---
 src/box/box.cc           | 138 +++++++++++++++++++++++++++++++--------
 src/box/box.h            |   1 +
 src/box/engine.h         |   6 ++
 src/box/engine_memtx.cc  |  14 ++++
 src/box/engine_memtx.h   |   3 +
 src/box/engine_sophia.cc |  64 ++++++++++++++++++
 src/box/engine_sophia.h  |   3 +
 src/box/recovery.cc      |  42 ++++++++----
 src/box/recovery.h       |   7 +-
 src/box/sophia_index.cc  |   8 ---
 src/sophia/snapshot      | Bin 0 -> 44 bytes
 third_party/sophia       |   2 +-
 12 files changed, 239 insertions(+), 49 deletions(-)
 create mode 100644 src/sophia/snapshot

diff --git a/src/box/box.cc b/src/box/box.cc
index e7b2c45aff..88b0f07bbc 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -51,6 +51,7 @@
 #include "user.h"
 #include "cfg.h"
 #include "iobuf.h"
+#include "coeio.h"
 
 static void process_ro(struct port *port, struct request *request);
 static void process_rw(struct port *port, struct request *request);
@@ -446,14 +447,14 @@ box_init()
 		/* Initialize a new replica */
 		replica_bootstrap(recovery);
 		space_end_recover_snapshot();
-		snapshot_save(recovery);
+		box_deploy(recovery);
 	} else {
 		/* Initialize the first server of a new cluster */
 		recovery_bootstrap(recovery);
 		box_set_cluster_uuid();
 		box_set_server_uuid();
 		space_end_recover_snapshot();
-		snapshot_save(recovery);
+		box_deploy(recovery);
 	}
 	fiber_gc();
 
@@ -480,7 +481,6 @@ box_init()
 	iobuf_set_readahead(cfg_geti("readahead"));
 }
 
-
 void
 box_atfork()
 {
@@ -537,45 +537,129 @@ box_snapshot_cb(struct log_io *l)
 	space_foreach(snapshot_space, l);
 }
 
+static inline void
+box_snapshot_engine(EngineFactory *f, void *udate)
+{
+	uint64_t lsn = *(uint64_t*)udate;
+	f->snapshot(lsn);
+}
+
+static inline void
+box_snapshot_delete_engine(EngineFactory *f, void *udate)
+{
+	uint64_t lsn = *(uint64_t*)udate;
+	f->snapshot_delete(lsn);
+}
+
+static inline void
+box_snapshot_complete_engine(EngineFactory *f, void *udate)
+{
+	uint64_t lsn = *(uint64_t*)udate;
+	while (! f->snapshot_ready(lsn))
+		fiber_yield_timeout(200000); /* 20 ms */
+}
+
+static ssize_t
+box_snapshot_close_cb(va_list ap)
+{
+	struct log_io *snap = va_arg(ap, struct log_io *);
+	int *status = va_arg(ap, int*);
+	*status = snapshot_close(snap);
+	return 0;
+}
+
 int
 box_snapshot(void)
 {
 	if (snapshot_pid)
 		return EINPROGRESS;
 
-	/* flush buffers to avoid multiple output */
-	/* https://github.com/tarantool/tarantool/issues/366 */
+	/* flush buffers to avoid multiple output
+	 *
+	 * https://github.com/tarantool/tarantool/issues/366
+	*/
 	fflush(stdout);
 	fflush(stderr);
-	pid_t p = fork();
-	if (p < 0) {
+
+	/* create snapshot file */
+	int rc, status;
+	pid_t pid;
+	uint64_t snap_lsn = vclock_signature(&recovery->vclock);
+	int snap_fd = -1;
+	struct log_io *snap;
+	snap = snapshot_create(recovery);
+	if (snap == NULL)
+		return errno;
+
+	/* create engine snapshot */
+	engine_foreach(box_snapshot_engine, &snap_lsn);
+
+	/* Due to fork nature, no threads are recreated.
+	 * This is the only consistency guarantee here for a
+	 * multi-threaded engine. */
+	pid = fork();
+	switch (pid) {
+	case -1:
 		say_syserror("fork");
-		return -1;
+		goto error;
+	case  0: /* dumper */
+		slab_arena_mprotect(&memtx_arena);
+		cord_set_name("snap");
+		title("dumper", "%" PRIu32, getppid());
+		fiber_set_name(fiber(), "dumper");
+		/* make sure we don't double-write parent stdio
+		 * buffers at exit() during panic */
+		snap_fd = fileno(snap->f);
+		close_all_xcpt(2, log_fd, snap_fd);
+		snapshot_write(recovery, snap);
+		exit(EXIT_SUCCESS);
+		return 0;
+	default: /* waiter */
+		snapshot_pid = pid;
 	}
-	if (p > 0) {
-		snapshot_pid = p;
-		/* increment snapshot version */
-		tuple_begin_snapshot();
-		int status = wait_for_child(p);
-		tuple_end_snapshot();
-		snapshot_pid = 0;
-		return (WIFSIGNALED(status) ? EINTR : WEXITSTATUS(status));
+
+	/* increment snapshot version */
+	tuple_begin_snapshot();
+
+	/* wait for memtx-part snapshot completion */
+	status = wait_for_child(pid);
+	if (WIFSIGNALED(status))
+		status = EINTR;
+	else
+		status = WEXITSTATUS(status);
+	if (status != 0) {
+		engine_foreach(box_snapshot_delete_engine, &snap_lsn);
+		return status;
 	}
 
-	slab_arena_mprotect(&memtx_arena);
+	/* wait for engine snapshot completion */
+	engine_foreach(box_snapshot_complete_engine, &snap_lsn);
 
-	cord_set_name("snap");
-	title("dumper", "%" PRIu32, getppid());
-	fiber_set_name(fiber(), "dumper");
-	/*
-	 * Safety: make sure we don't double-write
-	 * parent stdio buffers at exit().
-	 */
-	close_all_xcpt(1, log_fd);
-	snapshot_save(recovery);
+	/* wait for snapshot close/sync */
+	rc = coeio_custom(box_snapshot_close_cb, TIMEOUT_INFINITY,
+	                  snap, &status);
+	if (rc == -1 || status == -1)
+		goto error;
 
-	exit(EXIT_SUCCESS);
+	tuple_end_snapshot();
+	snapshot_pid = 0;
 	return 0;
+error:
+	/* rollback snapshot creation */
+	engine_foreach(box_snapshot_delete_engine, &snap_lsn);
+	return -1;
+}
+
+void
+box_deploy(struct recovery_state *r)
+{
+	/* create memtx snapshot */
+	snapshot_save(r);
+
+	/* create engine snapshot and for it completion */
+	uint64_t snap_lsn = vclock_signature(&r->vclock);
+	engine_foreach(box_snapshot_engine, &snap_lsn);
+	engine_foreach(box_snapshot_complete_engine, &snap_lsn);
 }
 
 const char *
diff --git a/src/box/box.h b/src/box/box.h
index 23ad5451e5..eb2121a7b6 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -83,6 +83,7 @@ extern uint32_t snapshot_version;
  * snapshot file.
  */
 int box_snapshot(void);
+void box_deploy(struct recovery_state*);
 
 /** Basic initialization of the storage dir. */
 void
diff --git a/src/box/engine.h b/src/box/engine.h
index 478ea0e043..17fe42f987 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -128,6 +128,12 @@ class EngineFactory: public Object {
 	virtual void begin(struct txn*, struct space*);
 	virtual void commit(struct txn*);
 	virtual void rollback(struct txn*);
+	/**
+	 * Engine snapshotting support.
+	 */
+	virtual void snapshot(uint64_t) = 0;
+	virtual int  snapshot_ready(uint64_t) = 0;
+	virtual void snapshot_delete(uint64_t) = 0;
 public:
 	/** Name of the engine. */
 	const char *name;
diff --git a/src/box/engine_memtx.cc b/src/box/engine_memtx.cc
index 2118b15d14..50a3350139 100644
--- a/src/box/engine_memtx.cc
+++ b/src/box/engine_memtx.cc
@@ -212,3 +212,17 @@ void MemtxFactory::rollback(struct txn *txn)
 		}
 	}
 }
+
+/*
+ * memtx snapshotting supported directly by box.
+ * do nothing here.
+*/
+void MemtxFactory::snapshot(uint64_t /* lsn */) {
+}
+
+int MemtxFactory::snapshot_ready(uint64_t) {
+	return 1;
+}
+
+void MemtxFactory::snapshot_delete(uint64_t) {
+}
diff --git a/src/box/engine_memtx.h b/src/box/engine_memtx.h
index d9fe35bfb3..1c2c8c2115 100644
--- a/src/box/engine_memtx.h
+++ b/src/box/engine_memtx.h
@@ -37,6 +37,9 @@ struct MemtxFactory: public EngineFactory {
 	virtual void keydefCheck(struct key_def *key_def);
 	virtual void recoveryEvent(enum engine_recovery_event event);
 	virtual void rollback(struct txn*);
+	virtual void snapshot(uint64_t);
+	virtual int  snapshot_ready(uint64_t);
+	virtual void snapshot_delete(uint64_t);
 };
 
 #endif /* TARANTOOL_BOX_ENGINE_MEMTX_H_INCLUDED */
diff --git a/src/box/engine_sophia.cc b/src/box/engine_sophia.cc
index efccbb2a58..986934ef56 100644
--- a/src/box/engine_sophia.cc
+++ b/src/box/engine_sophia.cc
@@ -88,7 +88,9 @@ sophia_recovery_end(struct space *space)
 	r->state   = READY_ALL_KEYS;
 	r->replace = sophia_replace;
 	r->recover = space_noop;
+	/*
 	sophia_complete_recovery(space);
+	*/
 }
 
 static void
@@ -152,6 +154,10 @@ SophiaFactory::recoveryEvent(enum engine_recovery_event event)
 		recovery.recover = sophia_recovery_end_snapshot;
 		break;
 	case END_RECOVERY:
+		/* complete two-phase recovery */
+		int rc = sp_open(env);
+		if (rc == -1)
+			sophia_raise(env);
 		recovery.state   = READY_NO_KEYS;
 		recovery.replace = sophia_replace;
 		recovery.recover = space_noop;
@@ -320,3 +326,61 @@ SophiaFactory::rollback(struct txn *)
 	});
 	sp_rollback(tx);
 }
+
+void SophiaFactory::snapshot(uint64_t lsn)
+{
+	/* start asynchronous checkpoint */
+	void *c = sp_ctl(env);
+	int rc = sp_set(c, "scheduler.checkpoint");
+	if (rc == -1)
+		sophia_raise(env);
+	/* create snapshot */
+	char snapshot[32];
+	snprintf(snapshot, sizeof(snapshot), "%" PRIu64, lsn);
+	rc = sp_set(c, "snapshot", snapshot);
+	if (rc == -1)
+		sophia_raise(env);
+}
+
+int SophiaFactory::snapshot_ready(uint64_t lsn)
+{
+	/* get sophia lsn associated with snapshot */
+	char snapshot[32];
+	snprintf(snapshot, sizeof(snapshot), "snapshot.%" PRIu64 ".lsn", lsn);
+	void *c = sp_ctl(env);
+	void *o = sp_get(c, snapshot);
+	if (o == NULL) {
+		if (sp_error(env))
+			sophia_raise(env);
+		panic("sophia snapshot %" PRIu64 " does not exist", lsn);
+	}
+	char *pe;
+	char *p = (char *)sp_get(o, "value", NULL);
+	uint64_t snapshot_start_lsn = strtoull(p, &pe, 10);
+	sp_destroy(o);
+
+	/* compare with a latest completed checkpoint lsn */
+	o = sp_get(c, "scheduler.checkpoint_lsn_last");
+	if (o == NULL)
+		sophia_raise(env);
+	p = (char *)sp_get(o, "value", NULL);
+	uint64_t last_lsn = strtoull(p, &pe, 10);
+	sp_destroy(o);
+	return last_lsn >= snapshot_start_lsn;
+}
+
+void SophiaFactory::snapshot_delete(uint64_t lsn)
+{
+	char snapshot[32];
+	snprintf(snapshot, sizeof(snapshot), "snapshot.%" PRIu64, lsn);
+	void *c = sp_ctl(env);
+	void *s = sp_get(c, snapshot);
+	if (s == NULL) {
+		if (sp_error(env))
+			sophia_raise(env);
+		panic("sophia snapshot %" PRIu64 " does not exist", lsn);
+	}
+	int rc = sp_delete(s);
+	if (rc == -1)
+		sophia_raise(env);
+}
diff --git a/src/box/engine_sophia.h b/src/box/engine_sophia.h
index e37ba9f74e..8f8b451b5f 100644
--- a/src/box/engine_sophia.h
+++ b/src/box/engine_sophia.h
@@ -40,6 +40,9 @@ struct SophiaFactory: public EngineFactory {
 	virtual void commit(struct txn*);
 	virtual void rollback(struct txn*);
 	virtual void recoveryEvent(enum engine_recovery_event);
+	virtual void snapshot(uint64_t);
+	virtual int  snapshot_ready(uint64_t);
+	virtual void snapshot_delete(uint64_t);
 	void *env;
 	void *tx;
 };
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index de7e76240a..7faeb0b75b 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -1174,27 +1174,45 @@ snapshot_write_row(struct recovery_state *r, struct log_io *l,
 	}
 }
 
-void
-snapshot_save(struct recovery_state *r)
+inline struct log_io*
+snapshot_create(struct recovery_state *r)
 {
 	assert(r->snapshot_handler != NULL);
 	struct log_io *snap = log_io_open_for_write(&r->snap_dir,
 		&r->server_uuid, &r->vclock);
-	if (snap == NULL)
-		panic_status(errno, "Failed to save snapshot: failed to open file in write mode.");
-	/*
-	 * While saving a snapshot, snapshot name is set to
-	 * <lsn>.snap.inprogress. When done, the snapshot is
-	 * renamed to <lsn>.snap.
-	 */
+	return snap;
+}
+
+inline void
+snapshot_write(struct recovery_state *r, struct log_io *snap)
+{
 	say_info("saving snapshot `%s'", snap->filename);
 
 	r->snapshot_handler(snap);
+}
 
-	log_io_close(&snap);
+inline int
+snapshot_close(struct log_io *snap)
+{
+	int rc = log_io_close(&snap);
+	if (rc == 0)
+		say_info("done");
+	return rc;
+}
 
-	say_info("done");
+void
+snapshot_save(struct recovery_state *r)
+{
+	/*
+	 * While saving a snapshot, snapshot name is set to
+	 * <lsn>.snap.inprogress. When done, the snapshot is
+	 * renamed to <lsn>.snap.
+	 */
+	struct log_io *snap = snapshot_create(r);
+	if (snap == NULL)
+		panic_status(errno, "Failed to save snapshot: failed to open file in write mode.");
+	snapshot_write(r, snap);
+	snapshot_close(snap);
 }
 
 /* }}} */
-
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 4f1628064a..fb2d7af866 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -163,7 +163,12 @@ struct fio_batch;
 
 void
 snapshot_write_row(struct recovery_state *r, struct log_io *l,
-		   struct xrow_header *packet);
+                   struct xrow_header *packet);
+
+struct log_io*
+snapshot_create(struct recovery_state *r);
+void snapshot_write(struct recovery_state *r, struct log_io *snap);
+int  snapshot_close(struct log_io *snap);
 void snapshot_save(struct recovery_state *r);
 
 #if defined(__cplusplus)
diff --git a/src/box/sophia_index.cc b/src/box/sophia_index.cc
index ebe9abb726..46744c2ff5 100644
--- a/src/box/sophia_index.cc
+++ b/src/box/sophia_index.cc
@@ -165,14 +165,6 @@ SophiaIndex::SophiaIndex(struct key_def *key_def_arg __attribute__((unused)))
 	int rc = sp_open(db);
 	if (rc == -1)
 		sophia_raise(env);
-	/* auto-complete any space created
-	 * after recovery */
-	engine_recovery *r = &factory->recovery;
-	if (r->recover == space_noop) {
-		rc = sp_open(db);
-		if (rc == -1)
-			sophia_raise(env);
-	}
 	tuple_format_ref(space->format, 1);
 }
 
diff --git a/src/sophia/snapshot b/src/sophia/snapshot
new file mode 100644
index 0000000000000000000000000000000000000000..b72acae8ce3df9d43941b6fc0f964265f090b24a
GIT binary patch
literal 44
acmXs2J6nO70Rfm84B#9l1|uZ4F#`ZwhXRfO

literal 0
HcmV?d00001

diff --git a/third_party/sophia b/third_party/sophia
index f3fe222f99..aa631d6eb4 160000
--- a/third_party/sophia
+++ b/third_party/sophia
@@ -1 +1 @@
-Subproject commit f3fe222f99d5c6734e6171a2df635aa3d041146c
+Subproject commit aa631d6eb47e22a69638187251e00c5a927bdae2
-- 
GitLab