From b5e774e646a89ea3396bee9992bbc9206527205c Mon Sep 17 00:00:00 2001
From: Alexandr Lyapunov <a.lyapunov@corp.mail.ru>
Date: Fri, 13 Nov 2015 18:32:26 +0300
Subject: [PATCH] fixed gh-859 : sc_schema_id in iproto

---
 src/box/box.cc             |  2 +-
 src/box/errcode.h          |  1 +
 src/box/iproto.cc          |  8 +++++
 src/box/iproto_constants.c |  2 +-
 src/box/iproto_constants.h |  3 +-
 src/box/iproto_port.cc     | 15 ++++++--
 src/box/request.h          |  2 +-
 src/box/xrow.cc            |  3 ++
 src/box/xrow.h             |  1 +
 test/box-py/iproto.result  | 17 +++++++++
 test/box-py/iproto.test.py | 71 ++++++++++++++++++++++++++++++++++++++
 test/box/misc.result       |  1 +
 test/box/socket.result     | 13 ++++---
 test/box/socket.test.lua   |  3 +-
 14 files changed, 130 insertions(+), 12 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index f1083ae99f..c897ac0b5a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -435,7 +435,7 @@ box_set_panic_on_wal_error(void)
  * @note Since this is for internal use, it has
  * no boundary or misuse checks.
  */
-void
+static void
 boxk(enum iproto_type type, uint32_t space_id, const char *format, ...)
 {
 	struct request req;
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 3ceadd5ef4..0efecd203e 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -162,6 +162,7 @@ struct errcode_record {
 	/*106 */_(ER_WRONG_INDEX_RECORD,  2, "Wrong record in _index space: got {%s}, expected {%s}") \
 	/*107 */_(ER_WRONG_INDEX_PARTS, 2, "Wrong index parts (field %u): %s; expected field1 id (number), field1 type (string), ...") \
 	/*108 */_(ER_WRONG_INDEX_OPTIONS, 2, "Wrong index options (field %u): %s") \
+	/*109 */_(ER_WRONG_SCHEMA_VERSION, 2, "Wrong schema version, current: %d, in request: %u") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 33c89d6ae0..80112ce65f 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -620,6 +620,8 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 	}
 }
 
+extern int sc_version;
+
 static void
 tx_process_msg(struct cmsg *m)
 {
@@ -632,6 +634,12 @@ tx_process_msg(struct cmsg *m)
 
 	session->sync = msg->header.sync;
 	try {
+		if (msg->header.schema_id &&
+		    msg->header.schema_id != sc_version) {
+			tnt_raise(ClientError, ER_WRONG_SCHEMA_VERSION,
+				  sc_version, msg->header.schema_id);
+		}
+
 		switch (msg->header.type) {
 		case IPROTO_SELECT:
 		{
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index b7d29fea26..1503cbd189 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -38,10 +38,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 		/* 0x02 */	MP_UINT,   /* IPROTO_SERVER_ID */
 		/* 0x03 */	MP_UINT,   /* IPROTO_LSN */
 		/* 0x04 */	MP_DOUBLE, /* IPROTO_TIMESTAMP */
+		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_ID */
 	/* }}} */
 
 	/* {{{ unused */
-		/* 0x05 */	MP_UINT,
 		/* 0x06 */	MP_UINT,
 		/* 0x07 */	MP_UINT,
 		/* 0x08 */	MP_UINT,
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index a33b71b499..ec73523b8e 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -54,6 +54,7 @@ enum iproto_key {
 	IPROTO_SERVER_ID = 0x02,
 	IPROTO_LSN = 0x03,
 	IPROTO_TIMESTAMP = 0x04,
+	IPROTO_SCHEMA_ID = 0x05,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
@@ -81,7 +82,7 @@ enum iproto_key {
 #define bit(c) (1ULL<<IPROTO_##c)
 
 #define IPROTO_HEAD_BMAP (bit(REQUEST_TYPE) | bit(SYNC) | bit(SERVER_ID) |\
-			  bit(LSN))
+			  bit(LSN) | bit(SCHEMA_ID))
 #define IPROTO_BODY_BMAP (bit(SPACE_ID) | bit(INDEX_ID) | bit(LIMIT) |\
 			  bit(OFFSET) | bit(ITERATOR) | bit(INDEX_BASE) |\
 			  bit(KEY) | bit(TUPLE) | bit(FUNCTION_NAME) | \
diff --git a/src/box/iproto_port.cc b/src/box/iproto_port.cc
index 76a19a9362..0d8a6621ab 100644
--- a/src/box/iproto_port.cc
+++ b/src/box/iproto_port.cc
@@ -40,12 +40,18 @@ struct iproto_header_bin {
 	uint8_t m_code;                         /* MP_UINT32 */
 	uint32_t v_code;                        /* response status */
 	uint8_t k_sync;                         /* IPROTO_SYNC */
-	uint8_t m_sync;                         /* MP_UIN64 */
+	uint8_t m_sync;                         /* MP_UINT64 */
 	uint64_t v_sync;                        /* sync */
+	uint8_t k_schema_id;                    /* IPROTO_SCHEMA_ID */
+	uint8_t m_schema_id;                    /* MP_UINT32 */
+	uint32_t v_schema_id;                   /* schema_id */
 } __attribute__((packed));
 
 static const struct iproto_header_bin iproto_header_bin = {
-	0xce, 0, 0x82, IPROTO_REQUEST_TYPE, 0xce, 0, IPROTO_SYNC, 0xcf, 0
+	0xce, 0, 0x83,
+	IPROTO_REQUEST_TYPE, 0xce, 0,
+	IPROTO_SYNC, 0xcf, 0,
+	IPROTO_SCHEMA_ID, 0xce, 0
 };
 
 struct iproto_body_bin {
@@ -63,6 +69,8 @@ static const struct iproto_body_bin iproto_error_bin = {
 	0x81, IPROTO_ERROR, 0xdb, 0
 };
 
+extern int sc_version;
+
 /** Return a 4-byte numeric error code, with status flags. */
 static inline uint32_t
 iproto_encode_error(uint32_t error)
@@ -76,6 +84,7 @@ iproto_reply_ok(struct obuf *out, uint64_t sync)
 	struct iproto_header_bin reply = iproto_header_bin;
 	reply.v_len = mp_bswap_u32(sizeof(iproto_header_bin) - 5 + 1);
 	reply.v_sync = mp_bswap_u64(sync);
+	reply.v_schema_id = mp_bswap_u32(sc_version);
 	uint8_t empty_map[1] = { 0x80 };
 	obuf_dup_xc(out, &reply, sizeof(reply));
 	obuf_dup_xc(out, &empty_map, sizeof(empty_map));
@@ -94,6 +103,7 @@ iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync)
 	header.v_len = mp_bswap_u32(len);
 	header.v_code = mp_bswap_u32(iproto_encode_error(errcode));
 	header.v_sync = mp_bswap_u64(sync);
+	header.v_schema_id = mp_bswap_u32(sc_version);
 
 	body.v_data_len = mp_bswap_u32(msg_len);
 
@@ -137,6 +147,7 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
 	struct iproto_header_bin header = iproto_header_bin;
 	header.v_len = mp_bswap_u32(len);
 	header.v_sync = mp_bswap_u64(sync);
+	header.v_schema_id = mp_bswap_u32(sc_version);
 
 	struct iproto_body_bin body = iproto_body_bin;
 	body.v_data_len = mp_bswap_u32(count);
diff --git a/src/box/request.h b/src/box/request.h
index bde3a3495a..aeb4ed90cc 100644
--- a/src/box/request.h
+++ b/src/box/request.h
@@ -96,7 +96,7 @@ request_encode(struct request *request, struct iovec *iov);
  */
 void
 request_rebind_to_primary_key(struct request *request, struct space *space,
-				struct tuple *found_tuple);
+			      struct tuple *found_tuple);
 
 /**
  * API of C stored function.
diff --git a/src/box/xrow.cc b/src/box/xrow.cc
index a5f96614d8..6a6b74e076 100644
--- a/src/box/xrow.cc
+++ b/src/box/xrow.cc
@@ -76,6 +76,9 @@ xrow_header_decode(struct xrow_header *header, const char **pos,
 		case IPROTO_TIMESTAMP:
 			header->tm = mp_decode_double(pos);
 			break;
+		case IPROTO_SCHEMA_ID:
+			header->schema_id = mp_decode_uint(pos);
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 769ceece3a..b7cceafeeb 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -56,6 +56,7 @@ struct xrow_header {
 	double tm;
 
 	int bodycnt;
+	uint32_t schema_id;
 	struct iovec body[XROW_BODY_IOVMAX];
 };
 
diff --git a/test/box-py/iproto.result b/test/box-py/iproto.result
index 678026f3a2..0425bf53fd 100644
--- a/test/box-py/iproto.result
+++ b/test/box-py/iproto.result
@@ -123,9 +123,26 @@ STR 65536
 --
 0xdb00010000 => ok ok ok
 
+Test of schema_id in iproto.
+Normal connect done w/o errors: True
+Got schema_id: True
+Zero-schema_id connect done w/o errors: True
+Same schema_id: True
+Normal connect done w/o errors: True
+Same schema_id: True
+Wrong schema_id leads to error: True
+Same schema_id: True
+space2 = box.schema.create_space('test2')
+---
+...
+Schema changed -> error: True
+Got another schema_id: True
 space:drop()
 ---
 ...
+space2:drop()
+---
+...
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
diff --git a/test/box-py/iproto.test.py b/test/box-py/iproto.test.py
index 85545ee902..fc9735c08f 100644
--- a/test/box-py/iproto.test.py
+++ b/test/box-py/iproto.test.py
@@ -209,5 +209,76 @@ for test in TESTS:
         print
     print
 
+
+print 'Test of schema_id in iproto.'
+c = Connection('localhost', server.iproto.port)
+c.connect()
+s = c._socket
+
+def test_request(req_header, req_body):
+    query_header = msgpack.dumps(req_header)
+    query_body = msgpack.dumps(req_body)
+    packet_len = len(query_header) + len(query_body)
+    query = msgpack.dumps(packet_len) + query_header + query_body
+    try:
+        s.send(query)
+    except OSError as e:
+        print '   => ', 'Failed to send request'
+    resp_len = ''
+    resp_headerbody = ''
+    resp_header = {}
+    resp_body = {}
+    try:
+        resp_len = s.recv(5)
+        resp_len = msgpack.loads(resp_len)
+        resp_headerbody = s.recv(resp_len)
+        unpacker = msgpack.Unpacker(use_list = True)
+        unpacker.feed(resp_headerbody)
+        resp_header = unpacker.unpack()
+        resp_body = unpacker.unpack()
+    except OSError as e:
+        print '   => ', 'Failed to recv response'
+    res = {}
+    res['header'] = resp_header
+    res['body'] = resp_body
+    return res
+
+header = { IPROTO_CODE : REQUEST_TYPE_SELECT}
+body = { IPROTO_SPACE_ID: space_id,
+    IPROTO_INDEX_ID: 0,
+    IPROTO_KEY: [],
+    IPROTO_ITERATOR: 2,
+    IPROTO_OFFSET: 0,
+    IPROTO_LIMIT: 1 }
+resp = test_request(header, body)
+print 'Normal connect done w/o errors:', resp['header'][0] == 0
+print 'Got schema_id:', resp['header'][5] > 0
+schema_id = resp['header'][5]
+
+header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : 0 }
+resp = test_request(header, body)
+print 'Zero-schema_id connect done w/o errors:', resp['header'][0] == 0
+print 'Same schema_id:', resp['header'][5] == schema_id
+
+header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : schema_id }
+resp = test_request(header, body)
+print 'Normal connect done w/o errors:', resp['header'][0] == 0
+print 'Same schema_id:', resp['header'][5] == schema_id
+
+header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : schema_id + 1 }
+resp = test_request(header, body)
+print 'Wrong schema_id leads to error:', resp['header'][0] != 0
+print 'Same schema_id:', resp['header'][5] == schema_id
+
+admin("space2 = box.schema.create_space('test2')")
+
+header = { IPROTO_CODE : REQUEST_TYPE_SELECT, 5 : schema_id }
+resp = test_request(header, body)
+print 'Schema changed -> error:', resp['header'][0] != 0
+print 'Got another schema_id:', resp['header'][5] != schema_id
+
+c.close()
+
 admin("space:drop()")
+admin("space2:drop()")
 admin("box.schema.user.revoke('guest', 'read,write,execute', 'universe')")
diff --git a/test/box/misc.result b/test/box/misc.result
index 754ef18b47..ce202f89d7 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -275,6 +275,7 @@ t;
   - 'box.error.ROLE_LOOP : 87'
   - 'box.error.TUPLE_NOT_FOUND : 4'
   - 'box.error.injection : table: <address>
+  - 'box.error.WRONG_SCHEMA_VERSION : 109'
   - 'box.error.ROLE_EXISTS : 83'
   - 'box.error.WRONG_INDEX_PARTS : 107'
   - 'box.error.DROP_USER : 44'
diff --git a/test/box/socket.result b/test/box/socket.result
index 555dd45069..ddfac443e0 100644
--- a/test/box/socket.result
+++ b/test/box/socket.result
@@ -168,17 +168,20 @@ pong = s:sysread()
 ...
 string.len(pong)
 ---
-- 23
+- 29
 ...
 msgpack.decode(pong)
 ---
-- 18
+- 24
 - 6
 ...
-msgpack.decode(pong, 6)
+function remove_schema_id(t, x) if t[5] then t[5] = 'XXX' end return t, x end
 ---
-- {0: 0, 1: 0}
-- 23
+...
+remove_schema_id(msgpack.decode(pong, 6))
+---
+- {0: 0, 1: 0, 5: 'XXX'}
+- 29
 ...
 s:close()
 ---
diff --git a/test/box/socket.test.lua b/test/box/socket.test.lua
index 99823b65c3..7b460d58cd 100644
--- a/test/box/socket.test.lua
+++ b/test/box/socket.test.lua
@@ -61,7 +61,8 @@ s:wait(.01)
 pong = s:sysread()
 string.len(pong)
 msgpack.decode(pong)
-msgpack.decode(pong, 6)
+function remove_schema_id(t, x) if t[5] then t[5] = 'XXX' end return t, x end
+remove_schema_id(msgpack.decode(pong, 6))
 
 s:close()
 
-- 
GitLab