From f1c2127d4367fe4663b76aac4197012d5d393fd9 Mon Sep 17 00:00:00 2001
From: Serge Petrenko <sergepetrenko@tarantool.org>
Date: Mon, 28 Jun 2021 16:46:13 +0300
Subject: [PATCH] replication: add META stage to JOIN

The new META stage is part of server's response to a join request.
It's marked by IPROTO_JOIN_META and IPROTO_JOIN_SNAPSHOT requests and goes
before the actual snapshot data.

Prerequisite #6034

@TarantoolBot document
Title: new protocol stage during JOIN

A new stage is added to the stream of JOIN rows coming from master.
The stage is marked with a bodyless row with type
IPROTO_JOIN_META = 71
Once all the rows from the stage are sent out, the JOIN continues as
before (as a stream of snapshot rows). The end of META stage is marked
with a row of type IPROTO_JOIN_SNAPSHOT = 72

The stage contains the rows that are necessary for instance
initialization (current Raft term, current state of synchronous
transaction queue), but do not belong to any system space.
---
 src/box/applier.cc         | 17 ++++++++++++++++-
 src/box/box.cc             |  7 ++++---
 src/box/iproto_constants.h |  2 ++
 src/box/relay.cc           | 19 ++++++++++++++++++-
 src/box/relay.h            |  4 +++-
 src/box/xrow.c             |  7 +++++++
 src/box/xrow.h             |  8 ++++++++
 7 files changed, 58 insertions(+), 6 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a9b6cacbc5..2a6fe8b245 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -447,12 +447,26 @@ applier_wait_snapshot(struct applier *applier)
 		xrow_decode_vclock_xc(&row, &replicaset.vclock);
 	}
 
+	coio_read_xrow(coio, ibuf, &row);
+	if (row.type == IPROTO_JOIN_META) {
+		/* Read additional metadata. Empty at the moment. */
+		do {
+			coio_read_xrow(coio, ibuf, &row);
+			if (iproto_type_is_error(row.type)) {
+				xrow_decode_error_xc(&row);
+			} else if (row.type != IPROTO_JOIN_SNAPSHOT) {
+				tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
+					  (uint32_t)row.type);
+			}
+		} while (row.type != IPROTO_JOIN_SNAPSHOT);
+		coio_read_xrow(coio, ibuf, &row);
+	}
+
 	/*
 	 * Receive initial data.
 	 */
 	uint64_t row_count = 0;
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			if (apply_snapshot_row(&row) != 0)
@@ -477,6 +491,7 @@ applier_wait_snapshot(struct applier *applier)
 			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
 				  (uint32_t) row.type);
 		}
+		coio_read_xrow(coio, ibuf, &row);
 	}
 
 	return row_count;
diff --git a/src/box/box.cc b/src/box/box.cc
index 692c5274ac..9dbeee5db4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2458,7 +2458,7 @@ box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header)
 
 	/* Send the snapshot data to the instance. */
 	struct vclock start_vclock;
-	relay_initial_join(io->fd, header->sync, &start_vclock);
+	relay_initial_join(io->fd, header->sync, &start_vclock, 0);
 	say_info("read-view sent.");
 
 	/* Remember master's vclock after the last request */
@@ -2606,7 +2606,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 
 	/* Decode JOIN request */
 	struct tt_uuid instance_uuid = uuid_nil;
-	uint32_t replica_version_id;
+	uint32_t replica_version_id = 0;
 	xrow_decode_join_xc(header, &instance_uuid, &replica_version_id);
 
 	/* Check that bootstrap has been finished */
@@ -2656,7 +2656,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	 * Initial stream: feed replica with dirty data from engines.
 	 */
 	struct vclock start_vclock;
-	relay_initial_join(io->fd, header->sync, &start_vclock);
+	relay_initial_join(io->fd, header->sync, &start_vclock,
+			   replica_version_id);
 	say_info("initial data sent.");
 
 	/**
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index ea7290da6d..758cd002c9 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -262,6 +262,8 @@ enum iproto_type {
 	IPROTO_FETCH_SNAPSHOT = 69,
 	/** REGISTER request to leave anonymous replication. */
 	IPROTO_REGISTER = 70,
+	IPROTO_JOIN_META = 71,
+	IPROTO_JOIN_SNAPSHOT = 72,
 
 	/** Vinyl run info stored in .index file */
 	VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 60f527b7f7..7f2dc368f6 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -392,7 +392,8 @@ relay_set_cord_name(int fd)
 }
 
 void
-relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
+relay_initial_join(int fd, uint64_t sync, struct vclock *vclock,
+		   uint32_t replica_version_id)
 {
 	struct relay *relay = relay_new(NULL);
 	if (relay == NULL)
@@ -432,6 +433,22 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 	row.sync = sync;
 	coio_write_xrow(&relay->io, &row);
 
+	/*
+	 * Version is present starting with 2.7.3, 2.8.2, 2.9.1
+	 * All these versions know of additional META stage of initial join.
+	 */
+	if (replica_version_id > 0) {
+		/* Mark the beginning of the metadata stream. */
+		xrow_encode_type(&row, IPROTO_JOIN_META);
+		xstream_write(&relay->stream, &row);
+
+		/* Empty at the moment. */
+
+		/* Mark the end of the metadata stream. */
+		xrow_encode_type(&row, IPROTO_JOIN_SNAPSHOT);
+		xstream_write(&relay->stream, &row);
+	}
+
 	/* Send read view to the replica. */
 	engine_join_xc(&ctx, &relay->stream);
 }
diff --git a/src/box/relay.h b/src/box/relay.h
index 615ffb75d3..112428ae8f 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -116,9 +116,11 @@ relay_push_raft(struct relay *relay, const struct raft_request *req);
  * @param fd        client connection
  * @param sync      sync from incoming JOIN request
  * @param vclock[out] vclock of the read view sent to the replica
+ * @param replica_version_id peer's version
  */
 void
-relay_initial_join(int fd, uint64_t sync, struct vclock *vclock);
+relay_initial_join(int fd, uint64_t sync, struct vclock *vclock,
+		   uint32_t replica_version_id);
 
 /**
  * Send final JOIN rows to the replica.
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 5c5da48085..8ab8b27687 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1730,6 +1730,13 @@ xrow_encode_timestamp(struct xrow_header *row, uint32_t replica_id, double tm)
 	row->tm = tm;
 }
 
+void
+xrow_encode_type(struct xrow_header *row, uint16_t type)
+{
+	memset(row, 0, sizeof(*row));
+	row->type = type;
+}
+
 void
 greeting_encode(char *greetingbuf, uint32_t version_id,
 		const struct tt_uuid *uuid, const char *salt, uint32_t salt_len)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 30d6b8639c..c6e8ed0fd0 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -570,6 +570,14 @@ xrow_decode_subscribe_response(struct xrow_header *row,
 void
 xrow_encode_timestamp(struct xrow_header *row, uint32_t replica_id, double tm);
 
+/**
+ * Encode any bodyless message.
+ * @param row[out] Row to encode into.
+ * @param type Message type.
+ */
+void
+xrow_encode_type(struct xrow_header *row, uint16_t type);
+
 /**
  * Fast encode xrow header using the specified header fields.
  * It is faster than the xrow_header_encode, because uses
-- 
GitLab