-
Vladimir Davydov authored
xrow_header_decode() checks that the packet body is a valid msgpack and there's no outstanding data beyond the packet end so we don't need to redo this checks while decoding the body (e.g. in xrow_decode_dml()).
Vladimir Davydov authoredxrow_header_decode() checks that the packet body is a valid msgpack and there's no outstanding data beyond the packet end so we don't need to redo this checks while decoding the body (e.g. in xrow_decode_dml()).
xrow.c 51.43 KiB
/*
* Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "xrow.h"
#include <msgpuck.h>
#include <small/region.h>
#include <small/obuf.h>
#include <base64.h>
#include "fiber.h"
#include "version.h"
#include "tt_static.h"
#include "error.h"
#include "mp_error.h"
#include "scramble.h"
#include "iproto_constants.h"
#include "iproto_features.h"
#include "mpstream/mpstream.h"
#include "errinj.h"
static_assert(IPROTO_DATA < 0x7f && IPROTO_METADATA < 0x7f &&
IPROTO_SQL_INFO < 0x7f, "encoded IPROTO_BODY keys must fit into "\
"one byte");
static inline uint32_t
mp_sizeof_vclock_ignore0(const struct vclock *vclock)
{
uint32_t size = vclock_size_ignore0(vclock);
return mp_sizeof_map(size) + size * (mp_sizeof_uint(UINT32_MAX) +
mp_sizeof_uint(UINT64_MAX));
}
static inline char *
mp_encode_vclock_ignore0(char *data, const struct vclock *vclock)
{
data = mp_encode_map(data, vclock_size_ignore0(vclock));
struct vclock_iterator it;
vclock_iterator_init(&it, vclock);
struct vclock_c replica;
replica = vclock_iterator_next(&it);
if (replica.id == 0)
replica = vclock_iterator_next(&it);
for ( ; replica.id < VCLOCK_MAX; replica = vclock_iterator_next(&it)) {
data = mp_encode_uint(data, replica.id);
data = mp_encode_uint(data, replica.lsn);
}
return data;
}
static int
mp_decode_vclock_ignore0(const char **data, struct vclock *vclock)
{
vclock_create(vclock);
if (mp_typeof(**data) != MP_MAP)
return -1;
uint32_t size = mp_decode_map(data);
for (uint32_t i = 0; i < size; i++) {
if (mp_typeof(**data) != MP_UINT)
return -1;
uint32_t id = mp_decode_uint(data);
if (mp_typeof(**data) != MP_UINT)
return -1;
int64_t lsn = mp_decode_uint(data);
/*
* Skip vclock[0] coming from the remote
* instances.
*/
if (lsn > 0 && id != 0)
vclock_follow(vclock, id, lsn);
}
return 0;
}
/**
* If log_level is 'verbose' or greater,
* dump the corrupted row contents in hex to the log.
*
* The format is similar to the xxd utility.
*/
void dump_row_hex(const char *start, const char *end) {
if (!say_log_level_is_enabled(S_VERBOSE))
return;
char *buf = tt_static_buf();
const char *buf_end = buf + TT_STATIC_BUF_LEN;
say_verbose("Got a corrupted row:");
for (const char *cur = start; cur < end;) {
char *pos = buf;
pos += snprintf(pos, buf_end - pos, "%08lX: ", cur - start);
for (size_t i = 0; i < 16; ++i) {
pos += snprintf(pos, buf_end - pos, "%02X ", (unsigned char)*cur++);
if (cur >= end || pos == buf_end)
break;
}
*pos = '\0';
say_verbose("%s", buf);
}
}
#define xrow_on_decode_err(start, end, what, desc_str) do {\
diag_set(ClientError, what, desc_str);\
dump_row_hex(start, end);\
} while (0);
int
xrow_header_decode(struct xrow_header *header, const char **pos,
const char *end, bool end_is_exact)
{
memset(header, 0, sizeof(struct xrow_header));
const char *tmp = *pos;
const char * const start = *pos;
if (mp_check(&tmp, end) != 0) {
error:
xrow_on_decode_err(start, end, ER_INVALID_MSGPACK, "packet header");
return -1;
}
if (mp_typeof(**pos) != MP_MAP)
goto error;
bool has_tsn = false;
uint32_t flags = 0;
uint32_t size = mp_decode_map(pos);
for (uint32_t i = 0; i < size; i++) {
if (mp_typeof(**pos) != MP_UINT)
goto error;
uint64_t key = mp_decode_uint(pos);
if (key >= IPROTO_KEY_MAX ||
iproto_key_type[key] != mp_typeof(**pos))
goto error;
switch (key) {
case IPROTO_REQUEST_TYPE:
header->type = mp_decode_uint(pos);
break;
case IPROTO_SYNC:
header->sync = mp_decode_uint(pos);
break;
case IPROTO_REPLICA_ID:
header->replica_id = mp_decode_uint(pos);
break;
case IPROTO_GROUP_ID:
header->group_id = mp_decode_uint(pos);
break;
case IPROTO_LSN:
header->lsn = mp_decode_uint(pos);
break;
case IPROTO_TIMESTAMP:
header->tm = mp_decode_double(pos);
break;
case IPROTO_SCHEMA_VERSION:
header->schema_version = mp_decode_uint(pos);
break;
case IPROTO_TSN:
has_tsn = true;
header->tsn = mp_decode_uint(pos);
break;
case IPROTO_FLAGS:
flags = mp_decode_uint(pos);
header->flags = flags;
break;
case IPROTO_STREAM_ID:
header->stream_id = mp_decode_uint(pos);
break;
default:
/* unknown header */
mp_next(pos);
}
}
assert(*pos <= end);
if (!has_tsn) {
/*
* Transaction id is not set so it is a single statement
* transaction.
*/
header->is_commit = true;
}
/* Restore transaction id from lsn and transaction serial number. */
header->tsn = header->lsn - header->tsn;
/* Nop requests aren't supposed to have a body. */
if (*pos < end && header->type != IPROTO_NOP) {
const char *body = *pos;
if (mp_check(pos, end)) {
xrow_on_decode_err(start, end, ER_INVALID_MSGPACK, "packet body");
return -1;
}
header->bodycnt = 1;
header->body[0].iov_base = (void *) body;
header->body[0].iov_len = *pos - body;
}
if (end_is_exact && *pos < end) {
xrow_on_decode_err(start,end, ER_INVALID_MSGPACK, "packet body");
return -1;
}
return 0;
}
/**
* @pre pos points at a valid msgpack
*/
static inline int
xrow_decode_uuid(const char **pos, struct tt_uuid *out)
{
if (mp_typeof(**pos) != MP_STR)
return -1;
uint32_t len = mp_decode_strl(pos);
if (tt_uuid_from_strl(*pos, len, out) != 0)
return -1;
*pos += len;
return 0;
}
int
xrow_header_encode(const struct xrow_header *header, uint64_t sync,
struct iovec *out, size_t fixheader_len)
{
/* allocate memory for sign + header */
out->iov_base = region_alloc(&fiber()->gc, XROW_HEADER_LEN_MAX +
fixheader_len);
if (out->iov_base == NULL) {
diag_set(OutOfMemory, XROW_HEADER_LEN_MAX + fixheader_len,
"gc arena", "xrow header encode");
return -1;
}
char *data = (char *) out->iov_base + fixheader_len;
/* Header */
char *d = data + 1; /* Skip 1 byte for MP_MAP */
int map_size = 0;
if (true) {
d = mp_encode_uint(d, IPROTO_REQUEST_TYPE);
d = mp_encode_uint(d, header->type);
map_size++;
}
if (sync) {
d = mp_encode_uint(d, IPROTO_SYNC);
d = mp_encode_uint(d, sync);
map_size++;
}
if (header->replica_id) {
d = mp_encode_uint(d, IPROTO_REPLICA_ID);
d = mp_encode_uint(d, header->replica_id);
map_size++;
}
if (header->group_id) {
d = mp_encode_uint(d, IPROTO_GROUP_ID);
d = mp_encode_uint(d, header->group_id);
map_size++;
}
if (header->lsn) {
d = mp_encode_uint(d, IPROTO_LSN);
d = mp_encode_uint(d, header->lsn);
map_size++;
}
if (header->tm) {
d = mp_encode_uint(d, IPROTO_TIMESTAMP);
d = mp_encode_double(d, header->tm);
map_size++;
}
/*
* We do not encode tsn and is_commit flags for
* single-statement transactions to save space in the
* binary log. We also encode tsn as a diff from lsn
* to save space in every multi-statement transaction row.
* The rules when encoding are simple:
* - if tsn is *not* encoded, it's a single-statement
* transaction, tsn = lsn, is_commit = true
* - if tsn is present, it's a multi-statement
* transaction, tsn = tsn + lsn, check is_commit
* flag to find transaction boundary (last row in the
* transaction stream).
*/
uint8_t flags_to_encode = header->flags & ~IPROTO_FLAG_COMMIT;
if (header->tsn != 0) {
if (header->tsn != header->lsn || !header->is_commit) {
/*
* Encode a transaction identifier for multi row
* transaction members.
*/
d = mp_encode_uint(d, IPROTO_TSN);
/*
* Differential encoding: write a transaction serial
* number (it is equal to lsn - transaction id) instead.
*/
d = mp_encode_uint(d, header->lsn - header->tsn);
map_size++;
}
if (header->is_commit && header->tsn != header->lsn) {
flags_to_encode |= IPROTO_FLAG_COMMIT;
}
}
if (header->stream_id != 0) {
d = mp_encode_uint(d, IPROTO_STREAM_ID);
d = mp_encode_uint(d, header->stream_id);
map_size++;
}
if (flags_to_encode != 0) {
d = mp_encode_uint(d, IPROTO_FLAGS);
d = mp_encode_uint(d, flags_to_encode);
map_size++;
}
assert(d <= data + XROW_HEADER_LEN_MAX);
mp_encode_map(data, map_size);
out->iov_len = d - (char *) out->iov_base;
out++;
memcpy(out, header->body, sizeof(*out) * header->bodycnt);
assert(1 + header->bodycnt <= XROW_IOVMAX);
return 1 + header->bodycnt; /* new iovcnt */
}
static inline char *
xrow_encode_uuid(char *pos, const struct tt_uuid *in)
{
return mp_encode_str(pos, tt_uuid_str(in), UUID_STR_LEN);
}
/* m_ - msgpack meta, k_ - key, v_ - value */
struct PACKED iproto_header_bin {
uint8_t m_len; /* MP_UINT32 */
uint32_t v_len; /* length */
uint8_t m_header; /* MP_MAP */
uint8_t k_code; /* IPROTO_REQUEST_TYPE */
uint8_t m_code; /* MP_UINT32 */
uint32_t v_code; /* response status */
uint8_t k_sync; /* IPROTO_SYNC */
uint8_t m_sync; /* MP_UINT64 */
uint64_t v_sync; /* sync */
uint8_t k_schema_version; /* IPROTO_SCHEMA_VERSION */
uint8_t m_schema_version; /* MP_UINT32 */
uint32_t v_schema_version; /* schema_version */
};
static_assert(sizeof(struct iproto_header_bin) == IPROTO_HEADER_LEN,
"sizeof(iproto_header_bin)");
void
iproto_header_encode(char *out, uint16_t type, uint64_t sync,
uint32_t schema_version, uint32_t body_length)
{
struct iproto_header_bin header;
header.m_len = 0xce;
/* 5 - sizeof(m_len and v_len fields). */
header.v_len = mp_bswap_u32(sizeof(header) + body_length - 5);
header.m_header = 0x83;
header.k_code = IPROTO_REQUEST_TYPE;
header.m_code = 0xce;
header.v_code = mp_bswap_u32(type);
header.k_sync = IPROTO_SYNC;
header.m_sync = 0xcf;
header.v_sync = mp_bswap_u64(sync);
header.k_schema_version = IPROTO_SCHEMA_VERSION;
header.m_schema_version = 0xce;
header.v_schema_version = mp_bswap_u32(schema_version);
memcpy(out, &header, sizeof(header));
}
struct PACKED iproto_body_bin {
uint8_t m_body; /* MP_MAP */
uint8_t k_data; /* IPROTO_DATA or errors */
uint8_t m_data; /* MP_STR or MP_ARRAY */
uint32_t v_data_len; /* string length of array size */
};
static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
static const struct iproto_body_bin iproto_body_bin = {
0x81, IPROTO_DATA, 0xdd, 0
};
/** Return a 4-byte numeric error code, with status flags. */
static inline uint32_t
iproto_encode_error(uint32_t error)
{
return error | IPROTO_TYPE_ERROR;
}
int
iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version)
{
char *buf = (char *)obuf_alloc(out, IPROTO_HEADER_LEN + 1);
if (buf == NULL) {
diag_set(OutOfMemory, IPROTO_HEADER_LEN + 1, "obuf_alloc",
"buf");
return -1;
}
iproto_header_encode(buf, IPROTO_OK, sync, schema_version, 1);
buf[IPROTO_HEADER_LEN] = 0x80; /* empty MessagePack Map */
return 0;
}
int
iproto_reply_id(struct obuf *out, uint64_t sync, uint32_t schema_version)
{
unsigned version = IPROTO_CURRENT_VERSION;
struct iproto_features *features = &IPROTO_CURRENT_FEATURES;
#ifndef NDEBUG
struct errinj *errinj;
errinj = errinj(ERRINJ_IPROTO_SET_VERSION, ERRINJ_INT);
if (errinj->iparam >= 0)
version = errinj->iparam;
struct iproto_features features_value;
errinj = errinj(ERRINJ_IPROTO_FLIP_FEATURE, ERRINJ_INT);
if (errinj->iparam >= 0 && errinj->iparam < iproto_feature_id_MAX) {
int feature_id = errinj->iparam;
features_value = *features;
features = &features_value;
if (iproto_features_test(features, feature_id))
iproto_features_clear(features, feature_id);
else
iproto_features_set(features, feature_id);
}
#endif
size_t size = IPROTO_HEADER_LEN;
size += mp_sizeof_map(2);
size += mp_sizeof_uint(IPROTO_VERSION);
size += mp_sizeof_uint(version);
size += mp_sizeof_uint(IPROTO_FEATURES);
size += mp_sizeof_iproto_features(features);
char *buf = obuf_alloc(out, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "buf");
return -1;
}
char *data = buf + IPROTO_HEADER_LEN;
data = mp_encode_map(data, 2);
data = mp_encode_uint(data, IPROTO_VERSION);
data = mp_encode_uint(data, version);
data = mp_encode_uint(data, IPROTO_FEATURES);
data = mp_encode_iproto_features(data, features);
assert(size == (size_t)(data - buf));
iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
size - IPROTO_HEADER_LEN);
return 0;
}
int
iproto_reply_vclock(struct obuf *out, const struct vclock *vclock,
uint64_t sync, uint32_t schema_version)
{
size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock_ignore0(vclock);
char *buf = obuf_reserve(out, max_size);
if (buf == NULL) {
diag_set(OutOfMemory, max_size,
"obuf_alloc", "buf");
return -1;
}
char *data = buf + IPROTO_HEADER_LEN;
data = mp_encode_map(data, 1);
data = mp_encode_uint(data, IPROTO_VCLOCK);
data = mp_encode_vclock_ignore0(data, vclock);
size_t size = data - buf;
assert(size <= max_size);
iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
size - IPROTO_HEADER_LEN);
char *ptr = obuf_alloc(out, size);
(void) ptr;
assert(ptr == buf);
return 0;
}
int
iproto_reply_vote(struct obuf *out, const struct ballot *ballot,
uint64_t sync, uint32_t schema_version)
{
size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(6) +
mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(ballot->is_ro_cfg) +
mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(ballot->is_ro) +
mp_sizeof_uint(IPROTO_BALLOT_IS_ANON) +
mp_sizeof_bool(ballot->is_anon) +
mp_sizeof_uint(IPROTO_BALLOT_IS_BOOTED) +
mp_sizeof_bool(ballot->is_booted) +
mp_sizeof_uint(UINT32_MAX) +
mp_sizeof_vclock_ignore0(&ballot->vclock) +
mp_sizeof_uint(UINT32_MAX) +
mp_sizeof_vclock_ignore0(&ballot->gc_vclock) +
mp_sizeof_uint(IPROTO_BALLOT_CAN_LEAD) +
mp_sizeof_bool(ballot->can_lead);
char *buf = obuf_reserve(out, max_size);
if (buf == NULL) {
diag_set(OutOfMemory, max_size,
"obuf_alloc", "buf");
return -1;
}
char *data = buf + IPROTO_HEADER_LEN;
data = mp_encode_map(data, 1);
data = mp_encode_uint(data, IPROTO_BALLOT);
data = mp_encode_map(data, 7);
data = mp_encode_uint(data, IPROTO_BALLOT_IS_RO_CFG);
data = mp_encode_bool(data, ballot->is_ro_cfg);
data = mp_encode_uint(data, IPROTO_BALLOT_IS_RO);
data = mp_encode_bool(data, ballot->is_ro);
data = mp_encode_uint(data, IPROTO_BALLOT_IS_ANON);
data = mp_encode_bool(data, ballot->is_anon);
data = mp_encode_uint(data, IPROTO_BALLOT_IS_BOOTED);
data = mp_encode_bool(data, ballot->is_booted);
data = mp_encode_uint(data, IPROTO_BALLOT_VCLOCK);
data = mp_encode_vclock_ignore0(data, &ballot->vclock);
data = mp_encode_uint(data, IPROTO_BALLOT_GC_VCLOCK);
data = mp_encode_vclock_ignore0(data, &ballot->gc_vclock);
data = mp_encode_uint(data, IPROTO_BALLOT_CAN_LEAD);
data = mp_encode_bool(data, ballot->can_lead);
size_t size = data - buf;
assert(size <= max_size);
iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
size - IPROTO_HEADER_LEN);
char *ptr = obuf_alloc(out, size);
(void) ptr;
assert(ptr == buf);
return 0;
}
static void
mpstream_error_handler(void *error_ctx)
{
*(bool *)error_ctx = true;
}
static void
mpstream_iproto_encode_error(struct mpstream *stream, const struct error *error)
{
mpstream_encode_map(stream, 2);
mpstream_encode_uint(stream, IPROTO_ERROR_24);
mpstream_encode_str(stream, error->errmsg);
mpstream_encode_uint(stream, IPROTO_ERROR);
error_to_mpstream_noext(error, stream);
}
int
iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
uint32_t schema_version)
{
char *header = (char *)obuf_alloc(out, IPROTO_HEADER_LEN);
if (header == NULL)
return -1;
bool is_error = false;
struct mpstream stream;
mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
mpstream_error_handler, &is_error);
uint32_t used = obuf_size(out);
mpstream_iproto_encode_error(&stream, e);
mpstream_flush(&stream);
uint32_t errcode = box_error_code(e);
iproto_header_encode(header, iproto_encode_error(errcode), sync,
schema_version, obuf_size(out) - used);
/* Malformed packet appears to be a lesser evil than abort. */
return is_error ? -1 : 0;
}
void
iproto_do_write_error(int fd, const struct error *e, uint32_t schema_version,
uint64_t sync)
{
bool is_error = false;
struct mpstream stream;
struct region *region = &fiber()->gc;
mpstream_init(&stream, region, region_reserve_cb, region_alloc_cb,
mpstream_error_handler, &is_error);
size_t region_svp = region_used(region);
mpstream_iproto_encode_error(&stream, e);
mpstream_flush(&stream);
if (is_error)
goto cleanup;
size_t payload_size = region_used(region) - region_svp;
char *payload = region_join(region, payload_size);
if (payload == NULL)
goto cleanup;
uint32_t errcode = box_error_code(e);
char header[IPROTO_HEADER_LEN];
iproto_header_encode(header, iproto_encode_error(errcode), sync,
schema_version, payload_size);
ssize_t unused;
ERROR_INJECT_YIELD(ERRINJ_IPROTO_WRITE_ERROR_DELAY);
unused = write(fd, header, sizeof(header));
unused = write(fd, payload, payload_size);
(void) unused;
cleanup:
region_truncate(region, region_svp);
}
int
iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size)
{
/**
* Reserve memory before taking a savepoint.
* This ensures that we get a contiguous chunk of memory
* and the savepoint is pointing at the beginning of it.
*/
void *ptr = obuf_reserve(buf, size);
if (ptr == NULL) {
diag_set(OutOfMemory, size, "obuf_reserve", "ptr");
return -1;
}
*svp = obuf_create_svp(buf);
ptr = obuf_alloc(buf, size);
assert(ptr != NULL);
return 0;
}
void
iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
uint32_t schema_version, uint32_t count)
{
char *pos = (char *) obuf_svp_to_ptr(buf, svp);
iproto_header_encode(pos, IPROTO_OK, sync, schema_version,
obuf_size(buf) - svp->used -
IPROTO_HEADER_LEN);
struct iproto_body_bin body = iproto_body_bin;
body.v_data_len = mp_bswap_u32(count);
memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
}
int
xrow_decode_sql(const struct xrow_header *row, struct sql_request *request)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "missing request body");
return 1;
}
assert(row->bodycnt == 1);
const char *data = (const char *) row->body[0].iov_base;
const char *end = data + row->body[0].iov_len;
assert((end - data) > 0);
if (mp_typeof(*data) != MP_MAP) {
xrow_on_decode_err(row->body[0].iov_base, end, ER_INVALID_MSGPACK,
"packet body");
return -1;
}
uint32_t map_size = mp_decode_map(&data);
request->sql_text = NULL;
request->bind = NULL;
request->stmt_id = NULL;
for (uint32_t i = 0; i < map_size; ++i) {
uint8_t key = *data;
if (key != IPROTO_SQL_BIND && key != IPROTO_SQL_TEXT &&
key != IPROTO_STMT_ID) {
mp_next(&data); /* skip the key */
mp_next(&data); /* skip the value */
continue;
}
const char *value = ++data; /* skip the key */
mp_next(&data); /* skip the value */
if (key == IPROTO_SQL_BIND)
request->bind = value;
else if (key == IPROTO_SQL_TEXT)
request->sql_text = value;
else
request->stmt_id = value;
}
if (request->sql_text != NULL && request->stmt_id != NULL) {
xrow_on_decode_err(row->body[0].iov_base, end, ER_INVALID_MSGPACK,
"SQL text and statement id are incompatible "\
"options in one request: choose one");
return -1;
}
if (request->sql_text == NULL && request->stmt_id == NULL) {
xrow_on_decode_err(row->body[0].iov_base, end,
ER_MISSING_REQUEST_FIELD,
tt_sprintf("%s or %s",
iproto_key_name(IPROTO_SQL_TEXT),
iproto_key_name(IPROTO_STMT_ID)));
return -1;
}
return 0;
}
void
iproto_reply_sql(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
uint32_t schema_version)
{
char *pos = (char *) obuf_svp_to_ptr(buf, svp);
iproto_header_encode(pos, IPROTO_OK, sync, schema_version,
obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
}
void
iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
uint32_t schema_version)
{
char *pos = (char *) obuf_svp_to_ptr(buf, svp);
iproto_header_encode(pos, IPROTO_CHUNK, sync, schema_version,
obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
struct iproto_body_bin body = iproto_body_bin;
body.v_data_len = mp_bswap_u32(1);
memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
}
int
xrow_decode_dml(struct xrow_header *row, struct request *request,
uint64_t key_map)
{
memset(request, 0, sizeof(*request));
request->header = row;
request->type = row->type;
const char *start = NULL;
const char *end = NULL;
if (row->bodycnt == 0)
goto done;
assert(row->bodycnt == 1);
const char *data = start = (const char *) row->body[0].iov_base;
end = data + row->body[0].iov_len;
assert((end - data) > 0);
if (mp_typeof(*data) != MP_MAP) {
error:
xrow_on_decode_err(row->body[0].iov_base, end, ER_INVALID_MSGPACK,
"packet body");
return -1;
}
uint32_t size = mp_decode_map(&data);
for (uint32_t i = 0; i < size; i++) {
if (! iproto_dml_body_has_key(data, end)) {
mp_next(&data);
mp_next(&data);
continue;
}
uint64_t key = mp_decode_uint(&data);
const char *value = data;
mp_next(&data);
if (key >= IPROTO_KEY_MAX ||
iproto_key_type[key] != mp_typeof(*value))
goto error;
key_map &= ~iproto_key_bit(key);
switch (key) {
case IPROTO_SPACE_ID:
request->space_id = mp_decode_uint(&value);
break;
case IPROTO_INDEX_ID:
request->index_id = mp_decode_uint(&value);
break;
case IPROTO_OFFSET:
request->offset = mp_decode_uint(&value);
break;
case IPROTO_INDEX_BASE:
request->index_base = mp_decode_uint(&value);
break;
case IPROTO_LIMIT:
request->limit = mp_decode_uint(&value);
break;
case IPROTO_ITERATOR:
request->iterator = mp_decode_uint(&value);
break;
case IPROTO_TUPLE:
request->tuple = value;
request->tuple_end = data;
break;
case IPROTO_KEY:
request->key = value;
request->key_end = data;
break;
case IPROTO_OPS:
request->ops = value;
request->ops_end = data;
break;
case IPROTO_TUPLE_META:
request->tuple_meta = value;
request->tuple_meta_end = data;
break;
default:
break;
}
}
done:
if (key_map) {
enum iproto_key key = (enum iproto_key) bit_ctz_u64(key_map);
xrow_on_decode_err(start, end, ER_MISSING_REQUEST_FIELD,
iproto_key_name(key));
return -1;
}
return 0;
}
static int
request_snprint(char *buf, int size, const struct request *request)
{
int total = 0;
SNPRINT(total, snprintf, buf, size, "{type: '%s', "
"replica_id: %u, lsn: %lld, "
"space_id: %u, index_id: %u",
iproto_type_name(request->type),
(unsigned) request->header->replica_id,
(long long) request->header->lsn,
(unsigned) request->space_id,
(unsigned) request->index_id);
if (request->key != NULL) {
SNPRINT(total, snprintf, buf, size, ", key: ");
SNPRINT(total, mp_snprint, buf, size, request->key);
}
if (request->tuple != NULL) {
SNPRINT(total, snprintf, buf, size, ", tuple: ");
SNPRINT(total, mp_snprint, buf, size, request->tuple);
}
if (request->ops != NULL) {
SNPRINT(total, snprintf, buf, size, ", ops: ");
SNPRINT(total, mp_snprint, buf, size, request->ops);
}
SNPRINT(total, snprintf, buf, size, "}");
return total;
}
const char *
request_str(const struct request *request)
{
char *buf = tt_static_buf();
if (request_snprint(buf, TT_STATIC_BUF_LEN, request) < 0)
return "<failed to format request>";
return buf;
}
int
xrow_encode_dml(const struct request *request, struct region *region,
struct iovec *iov)
{
int iovcnt = 1;
const int MAP_LEN_MAX = 40;
uint32_t key_len = request->key_end - request->key;
uint32_t ops_len = request->ops_end - request->ops;
uint32_t tuple_meta_len = request->tuple_meta_end - request->tuple_meta;
uint32_t tuple_len = request->tuple_end - request->tuple;
uint32_t len = MAP_LEN_MAX + key_len + ops_len + tuple_meta_len +
tuple_len;
char *begin = (char *) region_alloc(region, len);
if (begin == NULL) {
diag_set(OutOfMemory, len, "region_alloc", "begin");
return -1;
}
char *pos = begin + 1; /* skip 1 byte for MP_MAP */
int map_size = 0;
if (request->space_id) {
pos = mp_encode_uint(pos, IPROTO_SPACE_ID);
pos = mp_encode_uint(pos, request->space_id);
map_size++;
}
if (request->index_id) {
pos = mp_encode_uint(pos, IPROTO_INDEX_ID);
pos = mp_encode_uint(pos, request->index_id);
map_size++;
}
if (request->index_base) { /* UPDATE/UPSERT */
pos = mp_encode_uint(pos, IPROTO_INDEX_BASE);
pos = mp_encode_uint(pos, request->index_base);
map_size++;
}
if (request->key) {
pos = mp_encode_uint(pos, IPROTO_KEY);
memcpy(pos, request->key, key_len);
pos += key_len;
map_size++;
}
if (request->ops) {
pos = mp_encode_uint(pos, IPROTO_OPS);
memcpy(pos, request->ops, ops_len);
pos += ops_len;
map_size++;
}
if (request->tuple_meta) {
pos = mp_encode_uint(pos, IPROTO_TUPLE_META);
memcpy(pos, request->tuple_meta, tuple_meta_len);
pos += tuple_meta_len;
map_size++;
}
if (request->tuple) {
pos = mp_encode_uint(pos, IPROTO_TUPLE);
memcpy(pos, request->tuple, tuple_len);
pos += tuple_len;
map_size++;
}
if (map_size == 0)
return 0;
assert(pos <= begin + len);
mp_encode_map(begin, map_size);
iov[0].iov_base = begin;
iov[0].iov_len = pos - begin;
return iovcnt;
}
int
xrow_decode_id(const struct xrow_header *row, struct id_request *request)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
return -1;
}
assert(row->bodycnt == 1);
const char *data = (const char *)row->body[0].iov_base;
const char *end = data + row->body[0].iov_len;
if (mp_typeof(*data) != MP_MAP)
goto error;
request->version = 0;
iproto_features_create(&request->features);
const char *p = data;
uint32_t map_size = mp_decode_map(&p);
for (uint32_t i = 0; i < map_size; i++) {
if (mp_typeof(*p) != MP_UINT)
goto error;
uint64_t key = mp_decode_uint(&p);
if (key >= IPROTO_KEY_MAX ||
iproto_key_type[key] != mp_typeof(*p))
goto error;
switch (key) {
case IPROTO_VERSION:
request->version = mp_decode_uint(&p);
break;
case IPROTO_FEATURES:
if (mp_decode_iproto_features(
&p, &request->features) != 0)
goto error;
break;
default:
/* Ignore unknown keys for forward compatibility. */
mp_next(&p);
}
}
return 0;
error:
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, "request body");
return -1;
}
void
xrow_encode_synchro(struct xrow_header *row, char *body,
const struct synchro_request *req)
{
assert(iproto_type_is_synchro_request(req->type));
char *pos = body;
pos = mp_encode_map(pos,
iproto_type_is_promote_request(req->type) ? 3 : 2);
pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
pos = mp_encode_uint(pos, req->replica_id);
pos = mp_encode_uint(pos, IPROTO_LSN);
pos = mp_encode_uint(pos, req->lsn);
if (iproto_type_is_promote_request(req->type)) {
pos = mp_encode_uint(pos, IPROTO_TERM);
pos = mp_encode_uint(pos, req->term);
}
assert(pos - body < XROW_SYNCHRO_BODY_LEN_MAX);
memset(row, 0, sizeof(*row));
row->type = req->type;
row->body[0].iov_base = body;
row->body[0].iov_len = pos - body;
row->bodycnt = 1;
}
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
return -1;
}
assert(row->bodycnt == 1);
const char * const data = (const char *)row->body[0].iov_base;
const char * const end = data + row->body[0].iov_len;
if (mp_typeof(*data) != MP_MAP) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"request body");
return -1;
}
memset(req, 0, sizeof(*req));
const char *d = data;
uint32_t map_size = mp_decode_map(&d);
for (uint32_t i = 0; i < map_size; i++) {
enum mp_type type = mp_typeof(*d);
if (type != MP_UINT) {
mp_next(&d);
mp_next(&d);
continue;
}
uint8_t key = mp_decode_uint(&d);
if (key >= IPROTO_KEY_MAX || iproto_key_type[key] != type) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"request body");
return -1;
}
switch (key) {
case IPROTO_REPLICA_ID:
req->replica_id = mp_decode_uint(&d);
break;
case IPROTO_LSN:
req->lsn = mp_decode_uint(&d);
break;
case IPROTO_TERM:
req->term = mp_decode_uint(&d);
break;
default:
mp_next(&d);
}
}
req->type = row->type;
req->origin_id = row->replica_id;
return 0;
}
int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r)
{
/*
* Terms is encoded always. Sometimes the rest can be even ignored if
* the term is too old.
*/
int map_size = 1;
size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
mp_sizeof_uint(r->term);
if (r->vote != 0) {
++map_size;
size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
mp_sizeof_uint(r->vote);
}
if (r->state != 0) {
++map_size;
size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
mp_sizeof_uint(r->state);
}
if (r->vclock != NULL) {
++map_size;
size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
mp_sizeof_vclock_ignore0(r->vclock);
}
size += mp_sizeof_map(map_size);
char *buf = region_alloc(region, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
memset(row, 0, sizeof(*row));
row->type = IPROTO_RAFT;
row->body[0].iov_base = buf;
row->group_id = GROUP_LOCAL;
row->bodycnt = 1;
const char *begin = buf;
buf = mp_encode_map(buf, map_size);
buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
buf = mp_encode_uint(buf, r->term);
if (r->vote != 0) {
buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
buf = mp_encode_uint(buf, r->vote);
}
if (r->state != 0) {
buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
buf = mp_encode_uint(buf, r->state);
}
if (r->vclock != NULL) {
buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
buf = mp_encode_vclock_ignore0(buf, r->vclock);
}
row->body[0].iov_len = buf - begin;
return 0;
}
int
xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
struct vclock *vclock)
{
assert(row->type == IPROTO_RAFT);
if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
diag_set(ClientError, ER_INVALID_MSGPACK,
"malformed raft request");
return -1;
}
memset(r, 0, sizeof(*r));
const char *begin = row->body[0].iov_base;
const char *end = begin + row->body[0].iov_len;
const char *pos = begin;
uint32_t map_size = mp_decode_map(&pos);
for (uint32_t i = 0; i < map_size; ++i)
{
if (mp_typeof(*pos) != MP_UINT)
goto bad_msgpack;
uint64_t key = mp_decode_uint(&pos);
switch (key) {
case IPROTO_RAFT_TERM:
if (mp_typeof(*pos) != MP_UINT)
goto bad_msgpack;
r->term = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VOTE:
if (mp_typeof(*pos) != MP_UINT)
goto bad_msgpack;
r->vote = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_STATE:
if (mp_typeof(*pos) != MP_UINT)
goto bad_msgpack;
r->state = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VCLOCK:
r->vclock = vclock;
if (r->vclock == NULL)
mp_next(&pos);
else if (mp_decode_vclock_ignore0(&pos, vclock) != 0)
goto bad_msgpack;
break;
default:
mp_next(&pos);
break;
}
}
return 0;
bad_msgpack:
xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
return -1;
}
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
assert(mp_sizeof_uint(UINT32_MAX) == 5);
int iovcnt = xrow_header_encode(row, row->sync, out, 5);
if (iovcnt < 0)
return -1;
ssize_t len = -5;
for (int i = 0; i < iovcnt; i++)
len += out[i].iov_len;
/* Encode length */
char *data = (char *) out[0].iov_base;
*(data++) = 0xce; /* MP_UINT32 */
store_u32(data, mp_bswap_u32(len));
assert(iovcnt <= XROW_IOVMAX);
return iovcnt;
}
int
xrow_decode_call(const struct xrow_header *row, struct call_request *request)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK,
"missing request body");
return -1;
}
assert(row->bodycnt == 1);
const char *data = (const char *) row->body[0].iov_base;
const char *end = data + row->body[0].iov_len;
assert((end - data) > 0);
if (mp_typeof(*data) != MP_MAP) {
error:
xrow_on_decode_err(row->body[0].iov_base, end, ER_INVALID_MSGPACK,
"packet body");
return -1;
}
memset(request, 0, sizeof(*request));
uint32_t map_size = mp_decode_map(&data);
for (uint32_t i = 0; i < map_size; ++i) {
if (mp_typeof(*data) != MP_UINT)
goto error;
uint64_t key = mp_decode_uint(&data);
const char *value = data;
mp_next(&data);
switch (key) {
case IPROTO_FUNCTION_NAME:
if (mp_typeof(*value) != MP_STR)
goto error;
request->name = value;
break;
case IPROTO_EXPR:
if (mp_typeof(*value) != MP_STR)
goto error;
request->expr = value;
break;
case IPROTO_TUPLE:
if (mp_typeof(*value) != MP_ARRAY)
goto error;
request->args = value;
request->args_end = data;
break;
default:
continue; /* unknown key */
}
}
if (row->type == IPROTO_EVAL) {
if (request->expr == NULL) {
xrow_on_decode_err(row->body[0].iov_base, end, ER_MISSING_REQUEST_FIELD,
iproto_key_name(IPROTO_EXPR));
return -1;
}
} else if (request->name == NULL) {
assert(row->type == IPROTO_CALL_16 ||
row->type == IPROTO_CALL);
xrow_on_decode_err(row->body[0].iov_base, end, ER_MISSING_REQUEST_FIELD,
iproto_key_name(IPROTO_FUNCTION_NAME));
return -1;
}
if (request->args == NULL) {
static const char empty_args[] = { (char)0x90 };
request->args = empty_args;
request->args_end = empty_args + sizeof(empty_args);
}
return 0;
}
int
xrow_decode_auth(const struct xrow_header *row, struct auth_request *request)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK,
"missing request body");
return -1;
}
assert(row->bodycnt == 1);
const char *data = (const char *) row->body[0].iov_base;
const char *end = data + row->body[0].iov_len;
assert((end - data) > 0);
if (mp_typeof(*data) != MP_MAP) {
error:
xrow_on_decode_err(row->body[0].iov_base, end, ER_INVALID_MSGPACK,
"packet body");
return -1;
}
memset(request, 0, sizeof(*request));
uint32_t map_size = mp_decode_map(&data);
for (uint32_t i = 0; i < map_size; ++i) {
if (mp_typeof(*data) != MP_UINT)
goto error;
uint64_t key = mp_decode_uint(&data);
const char *value = data;
mp_next(&data);
switch (key) {
case IPROTO_USER_NAME:
if (mp_typeof(*value) != MP_STR)
goto error;
request->user_name = value;
break;
case IPROTO_TUPLE:
if (mp_typeof(*value) != MP_ARRAY)
goto error;
request->scramble = value;
break;
default:
continue; /* unknown key */
}
}
if (request->user_name == NULL) {
xrow_on_decode_err(row->body[0].iov_base, end, ER_MISSING_REQUEST_FIELD,
iproto_key_name(IPROTO_USER_NAME));
return -1;
}
if (request->scramble == NULL) {
xrow_on_decode_err(row->body[0].iov_base, end, ER_MISSING_REQUEST_FIELD,
iproto_key_name(IPROTO_TUPLE));
return -1;
}
return 0;
}
int
xrow_encode_auth(struct xrow_header *packet, const char *salt, size_t salt_len,
const char *login, size_t login_len,
const char *password, size_t password_len)
{
assert(login != NULL);
memset(packet, 0, sizeof(*packet));
size_t buf_size = XROW_BODY_LEN_MAX + login_len + SCRAMBLE_SIZE;
char *buf = (char *) region_alloc(&fiber()->gc, buf_size);
if (buf == NULL) {
diag_set(OutOfMemory, buf_size, "region_alloc", "buf");
return -1;
}
char *d = buf;
d = mp_encode_map(d, password != NULL ? 2 : 1);
d = mp_encode_uint(d, IPROTO_USER_NAME);
d = mp_encode_str(d, login, login_len);
if (password != NULL) { /* password can be omitted */
assert(salt_len >= SCRAMBLE_SIZE); /* greetingbuf_decode */
(void) salt_len;
char scramble[SCRAMBLE_SIZE];
scramble_prepare(scramble, salt, password, password_len);
d = mp_encode_uint(d, IPROTO_TUPLE);
d = mp_encode_array(d, 2);
d = mp_encode_str(d, "chap-sha1", strlen("chap-sha1"));
d = mp_encode_str(d, scramble, SCRAMBLE_SIZE);
}
assert(d <= buf + buf_size);
packet->body[0].iov_base = buf;
packet->body[0].iov_len = (d - buf);
packet->bodycnt = 1;
packet->type = IPROTO_AUTH;
return 0;
}
void
xrow_decode_error(struct xrow_header *row)
{
uint32_t code = row->type & (IPROTO_TYPE_ERROR - 1);
char error[DIAG_ERRMSG_MAX] = { 0 };
const char *pos;
uint32_t map_size;
if (row->bodycnt == 0)
goto error;
pos = (char *) row->body[0].iov_base;
if (mp_typeof(*pos) != MP_MAP)
goto error;
map_size = mp_decode_map(&pos);
bool is_stack_parsed = false;
for (uint32_t i = 0; i < map_size; i++) {
if (mp_typeof(*pos) != MP_UINT) {
mp_next(&pos); /* key */
mp_next(&pos); /* value */
continue;
}
uint8_t key = mp_decode_uint(&pos);
if (key == IPROTO_ERROR_24 && mp_typeof(*pos) == MP_STR) {
/*
* Obsolete way of sending error responses.
* To be deprecated but still should be supported
* to not break backward compatibility.
*/
uint32_t len;
const char *str = mp_decode_str(&pos, &len);
if (!is_stack_parsed) {
snprintf(error, sizeof(error), "%.*s", len, str);
box_error_set(__FILE__, __LINE__, code, error);
}
} else if (key == IPROTO_ERROR) {
struct error *e = error_unpack_unsafe(&pos);
if (e == NULL)
goto error;
is_stack_parsed = true;
diag_set_error(diag_get(), e);
} else {
mp_next(&pos);
continue;
}
}
return;
error:
box_error_set(__FILE__, __LINE__, code, error);
}
void
xrow_encode_vote(struct xrow_header *row)
{
memset(row, 0, sizeof(*row));
row->type = IPROTO_VOTE;
}
int
xrow_decode_ballot(struct xrow_header *row, struct ballot *ballot)
{
ballot->is_ro_cfg = false;
ballot->can_lead = false;
ballot->is_ro = false;
ballot->is_anon = false;
ballot->is_booted = true;
vclock_create(&ballot->vclock);
const char *start = NULL;
const char *end = NULL;
if (row->bodycnt == 0)
goto err;
assert(row->bodycnt == 1);
const char *data = start = (const char *) row->body[0].iov_base;
end = data + row->body[0].iov_len;
if (mp_typeof(*data) != MP_MAP)
goto err;
/* Find BALLOT key. */
uint32_t map_size = mp_decode_map(&data);
for (uint32_t i = 0; i < map_size; i++) {
if (mp_typeof(*data) != MP_UINT) {
mp_next(&data); /* key */
mp_next(&data); /* value */
continue;
}
if (mp_decode_uint(&data) == IPROTO_BALLOT)
break;
}
if (data == end)
return 0;
/* Decode BALLOT map. */
map_size = mp_decode_map(&data);
for (uint32_t i = 0; i < map_size; i++) {
if (mp_typeof(*data) != MP_UINT) {
mp_next(&data); /* key */
mp_next(&data); /* value */
continue;
}
uint32_t key = mp_decode_uint(&data);
switch (key) {
case IPROTO_BALLOT_IS_RO_CFG:
if (mp_typeof(*data) != MP_BOOL)
goto err;
ballot->is_ro_cfg = mp_decode_bool(&data);
break;
case IPROTO_BALLOT_IS_RO:
if (mp_typeof(*data) != MP_BOOL)
goto err;
ballot->is_ro = mp_decode_bool(&data);
break;
case IPROTO_BALLOT_IS_ANON:
if (mp_typeof(*data) != MP_BOOL)
goto err;
ballot->is_anon = mp_decode_bool(&data);
break;
case IPROTO_BALLOT_VCLOCK:
if (mp_decode_vclock_ignore0(&data,
&ballot->vclock) != 0)
goto err;
break;
case IPROTO_BALLOT_GC_VCLOCK:
if (mp_decode_vclock_ignore0(&data,
&ballot->gc_vclock) != 0)
goto err;
break;
case IPROTO_BALLOT_IS_BOOTED:
if (mp_typeof(*data) != MP_BOOL)
goto err;
ballot->is_booted = mp_decode_bool(&data);
break;
case IPROTO_BALLOT_CAN_LEAD:
if (mp_typeof(*data) != MP_BOOL)
goto err;
ballot->can_lead = mp_decode_bool(&data);
break;
default:
mp_next(&data);
}
}
return 0;
err:
xrow_on_decode_err(start, end, ER_INVALID_MSGPACK, "packet body");
return -1;
}
int
xrow_encode_register(struct xrow_header *row,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock)
{
memset(row, 0, sizeof(*row));
size_t size = mp_sizeof_map(2) +
mp_sizeof_uint(IPROTO_INSTANCE_UUID) +
mp_sizeof_str(UUID_STR_LEN) +
mp_sizeof_uint(IPROTO_VCLOCK) +
mp_sizeof_vclock_ignore0(vclock);
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
data = mp_encode_map(data, 2);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
data = xrow_encode_uuid(data, instance_uuid);
data = mp_encode_uint(data, IPROTO_VCLOCK);
data = mp_encode_vclock_ignore0(data, vclock);
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
row->bodycnt = 1;
row->type = IPROTO_REGISTER;
return 0;
}
int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
const struct vclock *vclock, bool anon,
uint32_t id_filter)
{
memset(row, 0, sizeof(*row));
size_t size = XROW_BODY_LEN_MAX +
mp_sizeof_vclock_ignore0(vclock);
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
int filter_size = bit_count_u32(id_filter);
data = mp_encode_map(data, filter_size != 0 ? 6 : 5);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
data = xrow_encode_uuid(data, instance_uuid);
data = mp_encode_uint(data, IPROTO_VCLOCK);
data = mp_encode_vclock_ignore0(data, vclock);
data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
data = mp_encode_uint(data, tarantool_version_id());
data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
data = mp_encode_bool(data, anon);
if (filter_size != 0) {
data = mp_encode_uint(data, IPROTO_ID_FILTER);
data = mp_encode_array(data, filter_size);
struct bit_iterator it;
bit_iterator_init(&it, &id_filter, sizeof(id_filter),
true);
for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
id = bit_iterator_next(&it)) {
data = mp_encode_uint(data, id);
}
}
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
row->bodycnt = 1;
row->type = IPROTO_SUBSCRIBE;
return 0;
}
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
uint32_t *version_id, bool *anon,
uint32_t *id_filter)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
return -1;
}
assert(row->bodycnt == 1);
const char * const data = (const char *) row->body[0].iov_base;
const char *end = data + row->body[0].iov_len;
if (mp_typeof(*data) != MP_MAP) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"request body");
return -1;
}
if (replicaset_uuid != NULL)
*replicaset_uuid = uuid_nil;
if (instance_uuid != NULL)
*instance_uuid = uuid_nil;
if (vclock != NULL)
vclock_create(vclock);
if (version_id != NULL)
*version_id = 0;
if (anon != NULL)
*anon = false;
if (id_filter != NULL)
*id_filter = 0;
const char *d = data;
uint32_t map_size = mp_decode_map(&d);
for (uint32_t i = 0; i < map_size; i++) {
if (mp_typeof(*d) != MP_UINT) {
mp_next(&d); /* key */
mp_next(&d); /* value */
continue;
}
uint8_t key = mp_decode_uint(&d);
switch (key) {
case IPROTO_CLUSTER_UUID:
if (replicaset_uuid == NULL)
goto skip;
if (xrow_decode_uuid(&d, replicaset_uuid) != 0) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"UUID");
return -1;
}
break;
case IPROTO_INSTANCE_UUID:
if (instance_uuid == NULL)
goto skip;
if (xrow_decode_uuid(&d, instance_uuid) != 0) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"UUID");
return -1;
}
break;
case IPROTO_VCLOCK:
if (vclock == NULL)
goto skip;
if (mp_decode_vclock_ignore0(&d, vclock) != 0) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"invalid VCLOCK");
return -1;
}
break;
case IPROTO_SERVER_VERSION:
if (version_id == NULL)
goto skip;
if (mp_typeof(*d) != MP_UINT) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"invalid VERSION");
return -1;
}
*version_id = mp_decode_uint(&d);
break;
case IPROTO_REPLICA_ANON:
if (anon == NULL)
goto skip;
if (mp_typeof(*d) != MP_BOOL) {
xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"invalid REPLICA_ANON flag");
return -1;
}
*anon = mp_decode_bool(&d);
break;
case IPROTO_ID_FILTER:
if (id_filter == NULL)
goto skip;
if (mp_typeof(*d) != MP_ARRAY) {
id_filter_decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
"invalid ID_FILTER");
return -1;
}
uint32_t len = mp_decode_array(&d);
for (uint32_t i = 0; i < len; ++i) {
if (mp_typeof(*d) != MP_UINT)
goto id_filter_decode_err;
uint64_t val = mp_decode_uint(&d);
if (val >= VCLOCK_MAX)
goto id_filter_decode_err;
*id_filter |= 1 << val;
}
break;
default: skip:
mp_next(&d); /* value */
}
}
return 0;
}
int
xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid)
{
memset(row, 0, sizeof(*row));
size_t size = 64;
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
data = mp_encode_map(data, 2);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
/* Greet the remote replica with our replica UUID */
data = xrow_encode_uuid(data, instance_uuid);
data = mp_encode_uint(data, IPROTO_SERVER_VERSION);
data = mp_encode_uint(data, tarantool_version_id());
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
row->bodycnt = 1;
row->type = IPROTO_JOIN;
return 0;
}
int
xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock)
{
memset(row, 0, sizeof(*row));
/* Add vclock to response body */
size_t size = 8 + mp_sizeof_vclock_ignore0(vclock);
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
data = mp_encode_map(data, 1);
data = mp_encode_uint(data, IPROTO_VCLOCK);
data = mp_encode_vclock_ignore0(data, vclock);
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
row->bodycnt = 1;
row->type = IPROTO_OK;
return 0;
}
int
xrow_encode_subscribe_response(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct vclock *vclock)
{
memset(row, 0, sizeof(*row));
size_t size = mp_sizeof_map(2) +
mp_sizeof_uint(IPROTO_VCLOCK) +
mp_sizeof_vclock_ignore0(vclock) +
mp_sizeof_uint(IPROTO_CLUSTER_UUID) +
mp_sizeof_str(UUID_STR_LEN);
char *buf = (char *) region_alloc(&fiber()->gc, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
char *data = buf;
data = mp_encode_map(data, 2);
data = mp_encode_uint(data, IPROTO_VCLOCK);
data = mp_encode_vclock_ignore0(data, vclock);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
row->bodycnt = 1;
row->type = IPROTO_OK;
return 0;
}
void
xrow_encode_timestamp(struct xrow_header *row, uint32_t replica_id, double tm)
{
memset(row, 0, sizeof(*row));
row->type = IPROTO_OK;
row->replica_id = replica_id;
row->tm = tm;
}
void
xrow_encode_type(struct xrow_header *row, uint16_t type)
{
memset(row, 0, sizeof(*row));
row->type = type;
}
void
greeting_encode(char *greetingbuf, uint32_t version_id,
const struct tt_uuid *uuid, const char *salt, uint32_t salt_len)
{
int h = IPROTO_GREETING_SIZE / 2;
int r = snprintf(greetingbuf, h + 1, "Tarantool %u.%u.%u (Binary) ",
version_id_major(version_id), version_id_minor(version_id),
version_id_patch(version_id));
assert(r + UUID_STR_LEN < h);
tt_uuid_to_string(uuid, greetingbuf + r);
r += UUID_STR_LEN;
memset(greetingbuf + r, ' ', h - r - 1);
greetingbuf[h - 1] = '\n';
assert(base64_bufsize(salt_len, 0) + 1 < h);
r = base64_encode(salt, salt_len, greetingbuf + h, h - 1, 0);
assert(r < h);
memset(greetingbuf + h + r, ' ', h - r - 1);
greetingbuf[IPROTO_GREETING_SIZE - 1] = '\n';
}
int
greeting_decode(const char *greetingbuf, struct greeting *greeting)
{
/* Check basic structure - magic string and \n delimiters */
if (memcmp(greetingbuf, "Tarantool ", strlen("Tarantool ")) != 0 ||
greetingbuf[IPROTO_GREETING_SIZE / 2 - 1] != '\n' ||
greetingbuf[IPROTO_GREETING_SIZE - 1] != '\n')
return -1;
memset(greeting, 0, sizeof(*greeting));
int h = IPROTO_GREETING_SIZE / 2;
const char *pos = greetingbuf + strlen("Tarantool ");
const char *end = greetingbuf + h;
for (; pos < end && *pos == ' '; ++pos); /* skip spaces */
/* Extract a version string - a string until ' ' */
char version[20];
const char *vend = (const char *) memchr(pos, ' ', end - pos);
if (vend == NULL || (size_t)(vend - pos) >= sizeof(version))
return -1;
memcpy(version, pos, vend - pos);
version[vend - pos] = '\0';
pos = vend + 1;
for (; pos < end && *pos == ' '; ++pos); /* skip spaces */
/* Parse a version string - 1.6.6-83-gc6b2129 or 1.6.7 */
unsigned major, minor, patch;
if (sscanf(version, "%u.%u.%u", &major, &minor, &patch) != 3)
return -1;
greeting->version_id = version_id(major, minor, patch);
if (*pos == '(') {
/* Extract protocol name - a string between (parentheses) */
vend = (const char *) memchr(pos + 1, ')', end - pos);
if (!vend || (vend - pos - 1) > GREETING_PROTOCOL_LEN_MAX)
return -1;
memcpy(greeting->protocol, pos + 1, vend - pos - 1);
greeting->protocol[vend - pos - 1] = '\0';
pos = vend + 1;
/* Parse protocol name - Binary or Lua console. */
if (strcmp(greeting->protocol, "Binary") != 0)
return 0;
if (greeting->version_id >= version_id(1, 6, 7)) {
if (*(pos++) != ' ')
return -1;
for (; pos < end && *pos == ' '; ++pos); /* spaces */
if (end - pos < UUID_STR_LEN)
return -1;
if (tt_uuid_from_strl(pos, UUID_STR_LEN, &greeting->uuid))
return -1;
}
} else if (greeting->version_id < version_id(1, 6, 7)) {
/* Tarantool < 1.6.7 doesn't add "(Binary)" to greeting */
strcpy(greeting->protocol, "Binary");
} else {
return -1; /* Sorry, don't want to parse this greeting */
}
/* Decode salt for binary protocol */
greeting->salt_len = base64_decode(greetingbuf + h, h - 1,
greeting->salt,
sizeof(greeting->salt));
if (greeting->salt_len < SCRAMBLE_SIZE || greeting->salt_len >= (uint32_t)h)
return -1;
return 0;
}