diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index addda39dc61db3be0b58c022914bd03d1524aa0d..f2902946a43bae8166ab9166e3efe42e03f23bfa 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -43,10 +43,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */ /* 0x08 */ MP_UINT, /* IPROTO_TSN */ /* 0x09 */ MP_UINT, /* IPROTO_FLAGS */ + /* 0x0a */ MP_UINT, /* IPROTO_STREAM_ID */ /* }}} */ /* {{{ unused */ - /* 0x0a */ MP_UINT, /* 0x0b */ MP_UINT, /* 0x0c */ MP_UINT, /* 0x0d */ MP_UINT, @@ -198,7 +198,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { "group id", /* 0x07 */ "tsn", /* 0x08 */ "flags", /* 0x09 */ - NULL, /* 0x0a */ + "stream_id", /* 0x0a */ NULL, /* 0x0b */ NULL, /* 0x0c */ NULL, /* 0x0d */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 3d78ce2bbe1ca36e43654677a76f64816b062767..b9498868cb319adc4b799ee9e8e04c5383d4d4b0 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -72,6 +72,7 @@ enum iproto_key { IPROTO_GROUP_ID = 0x07, IPROTO_TSN = 0x08, IPROTO_FLAGS = 0x09, + IPROTO_STREAM_ID = 0x0a, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/xrow.c b/src/box/xrow.c index a61c6e3457baaef8f2374b79f0c22a04f264c3f9..7df1af4abbe0186b36ff5965212c73fb30dc1381 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -186,6 +186,9 @@ xrow_header_decode(struct xrow_header *header, const char **pos, flags = mp_decode_uint(pos); header->flags = flags; break; + case IPROTO_STREAM_ID: + header->stream_id = mp_decode_uint(pos); + break; default: /* unknown header */ mp_next(pos); @@ -319,6 +322,11 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, flags_to_encode |= IPROTO_FLAG_COMMIT; } } + if (header->stream_id != 0) { + d = mp_encode_uint(d, IPROTO_STREAM_ID); + d = mp_encode_uint(d, header->stream_id); + map_size++; + } if (flags_to_encode != 0) { d = mp_encode_uint(d, IPROTO_FLAGS); d = mp_encode_uint(d, flags_to_encode); diff --git a/src/box/xrow.h b/src/box/xrow.h index 48b8b55f5e9b5135b65449645e9d5715bc94bc6a..cb83fddff3fec1814a9beb565cb4029e976c2963 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -81,6 +81,11 @@ struct xrow_header { * transaction. */ int64_t tsn; + /** + * Stream id. Used in iproto binary protocol to identify stream. + * Zero if stream is not used. + */ + uint64_t stream_id; /** Transaction meta flags set only in the last transaction row. */ union { uint8_t flags; diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index b6018eed9bd7ef6de75ce6f989672a8eef7f1719..2c0dd88b6fab8833ec8bb79f582fa9bf91b420bf 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -220,8 +220,10 @@ test_xrow_header_encode_decode() header.bodycnt = 0; header.tsn = header.lsn; uint64_t sync = 100500; + uint64_t stream_id = 1; for (int opt_idx = 0; opt_idx < bit_comb_count; opt_idx++) { - plan(12); + plan(13); + header.stream_id = stream_id++; header.is_commit = opt_idx & 0x01; header.wait_sync = opt_idx >> 1 & 0x01; header.wait_ack = opt_idx >> 2 & 0x01; @@ -229,7 +231,7 @@ test_xrow_header_encode_decode() is(1, xrow_header_encode(&header, sync, vec, 200), "encode"); int fixheader_len = 200; pos = (char *)vec[0].iov_base + fixheader_len; - uint32_t exp_map_size = 5; + uint32_t exp_map_size = 6; /* * header.is_commit flag isn't encoded, since this row looks * like a single-statement transaction. @@ -249,6 +251,7 @@ test_xrow_header_encode_decode() end += vec[0].iov_len; is(xrow_header_decode(&decoded_header, &begin, end, true), 0, "header decode"); + is(header.stream_id, decoded_header.stream_id, "decoded stream_id"); is(header.is_commit, decoded_header.is_commit, "decoded is_commit"); is(header.wait_sync, decoded_header.wait_sync, "decoded wait_sync"); is(header.wait_ack, decoded_header.wait_ack, "decoded wait_ack"); diff --git a/test/unit/xrow.result b/test/unit/xrow.result index 3b705d5ba60351200c386c44a4979d670a6efd31..1ca222d37558e0dfb9a10bbe30025a12d6236371 100644 --- a/test/unit/xrow.result +++ b/test/unit/xrow.result @@ -43,117 +43,125 @@ ok 1 - subtests 1..9 ok 1 - bad msgpack end - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 2 - subtests - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 3 - subtests - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 4 - subtests - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 5 - subtests - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 6 - subtests - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 7 - subtests - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 8 - subtests - 1..12 + 1..13 ok 1 - encode ok 2 - header map size ok 3 - header decode - ok 4 - decoded is_commit - ok 5 - decoded wait_sync - ok 6 - decoded wait_ack - ok 7 - decoded type - ok 8 - decoded replica_id - ok 9 - decoded lsn - ok 10 - decoded tm - ok 11 - decoded sync - ok 12 - decoded bodycnt + ok 4 - decoded stream_id + ok 5 - decoded is_commit + ok 6 - decoded wait_sync + ok 7 - decoded wait_ack + ok 8 - decoded type + ok 9 - decoded replica_id + ok 10 - decoded lsn + ok 11 - decoded tm + ok 12 - decoded sync + ok 13 - decoded bodycnt ok 9 - subtests ok 2 - subtests 1..1