From 2fca5c133b512ab40a5b42603fe27dd7e7db8bae Mon Sep 17 00:00:00 2001
From: Nikita Zheleztsov <n.zheleztsov@proton.me>
Date: Tue, 16 Jul 2024 18:56:35 +0300
Subject: [PATCH] engine: introduce stubs for checkpoint FETCH_SNAPSHOT

This commit introduces engine stubs that enable a new method
of fetching snapshots for anonymous replicas. Instead of using
the traditional read-view join approach, this update allows
file snapshot fetching. Note that file snapshot fetching
is only available in Tarantool EE.

Checkpoint fetching is done via IPROTO_IS_CHECKPOINT_JOIN,
IPROTO_CHECKPOINT_VCLOCK and IPROTO_CHECKPOINT_LSN fields.

If IPROTO_CHECKPOINT_JOIN is set to true, join will be done from
files: .snap for memtx, .run for vinyl, if false - from read view.

Checkpoint join allows to continue from the place, where client
stopped in case of snapshot fetching error. This allows to avoid
rebootstrap of an anonymous client. This can be done by specifying
CHECKPOINT_VCLOCK, which says from which file server should continue
join, client gets vclock at the beginning of the join. Specifying
CHECKPOINT_LSN allows to continue from some position in checkpoint.
Server sends all data >= CHECKPOINT_LSN.

If CHECKPOINT_VCLOCK is not specified, fetching is done from the latest
available checkpoint. If CHECKPOINT_LSN is not specified - start from
the beginning of the snap. So, specifying only IS_CHECKPOINT_JOIN
triggers fetching the latest checkpoint from files.

Needed for tarantool/tarantool-ee#741

NO_DOC=ee
NO_TEST=ee
NO_CHANGELOG=ee
---
 src/box/applier.cc         |  6 +++++
 src/box/box.cc             | 17 +++++++++++---
 src/box/engine.h           | 14 ++++++++++++
 src/box/iproto_constants.h | 17 +++++++++++++-
 src/box/memtx_engine.cc    | 45 ++++++++++++++++++++++++++++++++++++++
 src/box/relay.cc           |  4 +++-
 src/box/relay.h            |  5 ++++-
 src/box/vinyl.c            | 44 +++++++++++++++++++++++++++++++++++++
 src/box/xrow.c             | 45 ++++++++++++++++++++++++++++++++++++++
 src/box/xrow.h             |  6 +++++
 src/trivia/config.h.cmake  |  1 +
 11 files changed, 198 insertions(+), 6 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 2368af2cb5..b0df83526c 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -870,8 +870,14 @@ applier_fetch_snapshot(struct applier *applier)
 	struct iostream *io = &applier->io;
 	struct xrow_header row;
 
+	struct vclock vclock;
+	vclock_create(&vclock);
 	struct fetch_snapshot_request req = {
 		.version_id = tarantool_version_id(),
+		/* Applier doesn't support checkpoint join. */
+		.is_checkpoint_join = false,
+		.checkpoint_vclock = vclock,
+		.checkpoint_lsn = 0,
 	};
 	RegionGuard region_guard(&fiber()->gc);
 	xrow_encode_fetch_snapshot(&row, &req);
diff --git a/src/box/box.cc b/src/box/box.cc
index a1e8d00716..d2c5c18ab7 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -4332,11 +4332,21 @@ box_process_fetch_snapshot(struct iostream *io,
 			  "wal_mode = 'none'");
 	}
 
-	say_info("sending current read-view to replica at %s", sio_socketname(io->fd));
+	say_info("sending read-view to replica at %s", sio_socketname(io->fd));
+	/* Used for checkpoint initial join. */
+	struct checkpoint_cursor cursor;
+	struct checkpoint_cursor *cursor_ptr = NULL;
+	if (req.is_checkpoint_join) {
+		memset(&cursor, 0, sizeof(cursor));
+		cursor.vclock = &req.checkpoint_vclock;
+		cursor.start_lsn = req.checkpoint_lsn;
+		cursor_ptr = &cursor;
+	}
 
 	/* Send the snapshot data to the instance. */
 	struct vclock start_vclock;
-	relay_initial_join(io, header->sync, &start_vclock, req.version_id);
+	relay_initial_join(io, header->sync, &start_vclock, req.version_id,
+			   cursor_ptr);
 	say_info("read-view sent.");
 
 	/* Remember master's vclock after the last request */
@@ -4587,7 +4597,8 @@ box_process_join(struct iostream *io, const struct xrow_header *header)
 	 * Initial stream: feed replica with dirty data from engines.
 	 */
 	struct vclock start_vclock;
-	relay_initial_join(io, header->sync, &start_vclock, req.version_id);
+	relay_initial_join(io, header->sync, &start_vclock, req.version_id,
+			   NULL);
 	say_info("initial data sent.");
 	/**
 	 * Register the replica after sending the last row but before sending
diff --git a/src/box/engine.h b/src/box/engine.h
index 13dd3938e5..1ca8e5bfd5 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -315,11 +315,25 @@ struct engine_read_view {
 	struct rlist link;
 };
 
+/**
+ * Cursor used during checkpoint initial join. Shared between engines.
+ */
+struct checkpoint_cursor {
+	/** Signature of the checkpoint to take data from. */
+	struct vclock *vclock;
+	/** Checkpoint lsn to start from. */
+	int64_t start_lsn;
+	/** Counter, shared between engines */
+	int64_t lsn_counter;
+};
+
 struct engine_join_ctx {
 	/** Vclock to respond with. */
 	struct vclock *vclock;
 	/** Whether sending JOIN_META stage is required. */
 	bool send_meta;
+	/** Checkpoint join cursor. */
+	struct checkpoint_cursor *cursor;
 	/** Array of engine join contexts, one per each engine. */
 	void **data;
 };
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index df6795650d..3986843f95 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -218,7 +218,22 @@ extern const char *iproto_flag_bit_strs[];
 	/**
 	 * Flag indicating whether the transaction is synchronous.
 	 */								\
-	 _(IS_SYNC, 0x61, MP_BOOL)
+	 _(IS_SYNC, 0x61, MP_BOOL)					\
+	 /**
+	  * Flag indicating whether checkpoint join should be done.
+	  */								\
+	 _(IS_CHECKPOINT_JOIN, 0x62, MP_BOOL)				\
+	 /**
+	  * Shows the signature of the checkpoint to read from.
+	  * Requires CHECKPOINT_JOIN to be true.
+	  */								\
+	 _(CHECKPOINT_VCLOCK, 0x63, MP_MAP)				\
+	 /**
+	  * Shows the lsn to start sending from. Server sends all rows
+	  * >= IPROTO_CHECKPOINT_LSN. Requires CHECKPOINT_JOIN to be
+	  * true and CHECKPOINT_VCLOCK to be set.
+	  */								\
+	 _(CHECKPOINT_LSN, 0x64, MP_UINT)				\
 
 #define IPROTO_KEY_MEMBER(s, v, ...) IPROTO_ ## s = v,
 
diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc
index 286e84c28d..d1403b6da3 100644
--- a/src/box/memtx_engine.cc
+++ b/src/box/memtx_engine.cc
@@ -1326,6 +1326,42 @@ send_join_meta(struct xstream *stream, const struct raft_request *raft_req,
 	xstream_write(stream, &row);
 }
 
+#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR)
+#include "memtx_checkpoint_join.cc"
+#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
+static int
+memtx_engine_prepare_checkpoint_join(struct engine *engine,
+				     struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+	diag_set(ClientError, ER_UNSUPPORTED, "Community edition",
+		 "checkpoint join");
+	return -1;
+}
+
+static int
+memtx_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx,
+			     struct xstream *stream)
+{
+	(void)engine;
+	(void)ctx;
+	(void)stream;
+	unreachable();
+	return -1;
+}
+
+static void
+memtx_engine_complete_checkpoint_join(struct engine *engine,
+				      struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+}
+
+#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
 /** Space filter for replica join. */
 static bool
 memtx_join_space_filter(struct space *space, void *arg)
@@ -1339,6 +1375,9 @@ memtx_join_space_filter(struct space *space, void *arg)
 static int
 memtx_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return memtx_engine_prepare_checkpoint_join(engine, arg);
+
 	struct memtx_join_ctx *ctx =
 		(struct memtx_join_ctx *)malloc(sizeof(*ctx));
 	if (ctx == NULL) {
@@ -1425,6 +1464,9 @@ static int
 memtx_engine_join(struct engine *engine, struct engine_join_ctx *arg,
 		  struct xstream *stream)
 {
+	if (arg->cursor != NULL)
+		return memtx_engine_checkpoint_join(engine, arg, stream);
+
 	struct memtx_join_ctx *ctx =
 		(struct memtx_join_ctx *)arg->data[engine->id];
 	ctx->stream = stream;
@@ -1480,6 +1522,9 @@ memtx_engine_join(struct engine *engine, struct engine_join_ctx *arg,
 static void
 memtx_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return memtx_engine_complete_checkpoint_join(engine, arg);
+
 	struct memtx_join_ctx *ctx =
 		(struct memtx_join_ctx *)arg->data[engine->id];
 	read_view_close(&ctx->rv);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 8733940081..273b908e0f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -441,7 +441,8 @@ relay_cord_init(struct relay *relay)
 
 void
 relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
-		   uint32_t replica_version_id)
+		   uint32_t replica_version_id,
+		   struct checkpoint_cursor *cursor)
 {
 	struct relay *relay = relay_new(NULL);
 	if (relay == NULL)
@@ -463,6 +464,7 @@ relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
 	 */
 	ctx.send_meta = replica_version_id > 0;
 	ctx.vclock = vclock;
+	ctx.cursor = cursor;
 	engine_prepare_join_xc(&ctx);
 	auto join_guard = make_scoped_guard([&] {
 		engine_complete_join(&ctx);
diff --git a/src/box/relay.h b/src/box/relay.h
index 87b7d7c591..335d3e8bdc 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -42,6 +42,7 @@ struct relay;
 struct replica;
 struct tt_uuid;
 struct vclock;
+struct checkpoint_cursor;
 
 enum relay_state {
 	/**
@@ -122,10 +123,12 @@ relay_push_raft(struct relay *relay, const struct raft_request *req);
  * @param sync      sync from incoming JOIN request
  * @param vclock[out] vclock of the read view sent to the replica
  * @param replica_version_id peer's version
+ * @param cursor    cursor for checkpoint join, if NULL - read-view join.
  */
 void
 relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
-		   uint32_t replica_version_id);
+		   uint32_t replica_version_id,
+		   struct checkpoint_cursor *cursor);
 
 /**
  * Send final JOIN rows to the replica.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e33763bbe8..1c5525077c 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3029,6 +3029,41 @@ struct vy_join_ctx {
 	struct vy_read_view *rv;
 };
 
+#if defined(ENABLE_FETCH_SNAPSHOT_CURSOR)
+#include "vinyl_checkpoint_join.c"
+#else /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
+static int
+vinyl_engine_prepare_checkpoint_join(struct engine *engine,
+				     struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+	unreachable();
+	return -1;
+}
+
+static int
+vinyl_engine_checkpoint_join(struct engine *engine, struct engine_join_ctx *ctx,
+			     struct xstream *stream)
+{
+	(void)engine;
+	(void)ctx;
+	(void)stream;
+	unreachable();
+	return -1;
+}
+
+static void
+vinyl_engine_complete_checkpoint_join(struct engine *engine,
+				      struct engine_join_ctx *ctx)
+{
+	(void)engine;
+	(void)ctx;
+}
+
+#endif /* !defined(ENABLE_FETCH_SNAPSHOT_CURSOR) */
+
 static int
 vy_join_add_space(struct space *space, void *arg)
 {
@@ -3063,6 +3098,9 @@ vy_join_add_space(struct space *space, void *arg)
 static int
 vinyl_engine_prepare_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return vinyl_engine_prepare_checkpoint_join(engine, arg);
+
 	struct vy_env *env = vy_env(engine);
 	struct vy_join_ctx *ctx = malloc(sizeof(*ctx));
 	if (ctx == NULL) {
@@ -3104,6 +3142,9 @@ static int
 vinyl_engine_join(struct engine *engine, struct engine_join_ctx *arg,
 		  struct xstream *stream)
 {
+	if (arg->cursor != NULL)
+		return vinyl_engine_checkpoint_join(engine, arg, stream);
+
 	int loops = 0;
 	struct vy_join_ctx *ctx = arg->data[engine->id];
 	struct vy_join_entry *join_entry;
@@ -3129,6 +3170,9 @@ vinyl_engine_join(struct engine *engine, struct engine_join_ctx *arg,
 static void
 vinyl_engine_complete_join(struct engine *engine, struct engine_join_ctx *arg)
 {
+	if (arg->cursor != NULL)
+		return vinyl_engine_complete_checkpoint_join(engine, arg);
+
 	struct vy_env *env = vy_env(engine);
 	struct vy_join_ctx *ctx = arg->data[engine->id];
 	struct vy_join_entry *entry, *next;
diff --git a/src/box/xrow.c b/src/box/xrow.c
index e7f1dcf20f..dc3c6ad9b7 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -2093,6 +2093,12 @@ struct replication_request {
 	uint32_t *version_id;
 	/** IPROTO_REPLICA_ANON. */
 	bool *is_anon;
+	/** IPROTO_IS_CHECKPOINT_JOIN. */
+	bool *is_checkpoint_join;
+	/** IPROTO_CHECKPOINT_VCLOCK. */
+	struct vclock *checkpoint_vclock;
+	/** IPROTO_CHECKPOINT_LSN. */
+	uint64_t *checkpoint_lsn;
 };
 
 /** Encode a replication request template. */
@@ -2288,6 +2294,36 @@ id_filter_decode_err:		xrow_on_decode_err(row, ER_INVALID_MSGPACK,
 				*req->id_filter |= 1 << val;
 			}
 			break;
+		case IPROTO_IS_CHECKPOINT_JOIN:
+			if (req->is_checkpoint_join == NULL)
+				goto skip;
+			if (mp_typeof(*d) != MP_BOOL) {
+				xrow_on_decode_err(
+					row, ER_INVALID_MSGPACK,
+					"invalid IS_CHECKPOINT_JOIN");
+				return -1;
+			}
+			*req->is_checkpoint_join = mp_decode_bool(&d);
+			break;
+		case IPROTO_CHECKPOINT_VCLOCK:
+			if (req->checkpoint_vclock == NULL)
+				goto skip;
+			if (mp_decode_vclock(&d, req->checkpoint_vclock) != 0) {
+				xrow_on_decode_err(row, ER_INVALID_MSGPACK,
+						   "invalid CHECKPOINT_VCLOCK");
+				return -1;
+			}
+			break;
+		case IPROTO_CHECKPOINT_LSN:
+			if (req->checkpoint_lsn == NULL)
+				goto skip;
+			if (mp_typeof(*d) != MP_UINT) {
+				xrow_on_decode_err(row, ER_INVALID_MSGPACK,
+						   "invalid CHECKPOINT_LSN");
+				return -1;
+			}
+			*req->checkpoint_lsn = mp_decode_uint(&d);
+			break;
 		default: skip:
 			mp_next(&d); /* value */
 		}
@@ -2400,7 +2436,16 @@ xrow_decode_fetch_snapshot(const struct xrow_header *row,
 	memset(req, 0, sizeof(*req));
 	struct replication_request base_req = {
 		.version_id = &req->version_id,
+		.is_checkpoint_join = &req->is_checkpoint_join,
+		.checkpoint_vclock = &req->checkpoint_vclock,
+		.checkpoint_lsn = &req->checkpoint_lsn,
 	};
+	/*
+	 * Vclock must be cleared, as it sets -1 signature, which cannot be
+	 * done by memset above. This is done in order to distinguish not
+	 * initialized vclock from the zero one.
+	 */
+	vclock_clear(&req->checkpoint_vclock);
 	return xrow_decode_replication_request(row, &base_req);
 }
 
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 8b9dc7d4ad..979154b9c9 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -670,6 +670,12 @@ xrow_decode_join(const struct xrow_header *row, struct join_request *req);
 struct fetch_snapshot_request {
 	/** Replica's version. */
 	uint32_t version_id;
+	/** Flag indicating whether checkpoint join should be done. */
+	bool is_checkpoint_join;
+	/** Checkpoint's vclock, signature of the snapshot. */
+	struct vclock checkpoint_vclock;
+	/** Checkpoint's lsn, the row number to start from. */
+	uint64_t checkpoint_lsn;
 };
 
 /** Encode FETCH_SNAPSHOT request. */
diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake
index b717eaf0a1..03b01756ca 100644
--- a/src/trivia/config.h.cmake
+++ b/src/trivia/config.h.cmake
@@ -293,6 +293,7 @@
 #cmakedefine ENABLE_CONFIG_EXTRAS 1
 #cmakedefine ENABLE_FAILOVER 1
 #cmakedefine ENABLE_INTEGRITY 1
+#cmakedefine ENABLE_FETCH_SNAPSHOT_CURSOR 1
 
 #cmakedefine EXPORT_LIBCURL_SYMBOLS 1
 
-- 
GitLab