diff --git a/src/box/replica.cc b/src/box/replica.cc index 03bd587376171998361f45dd9b557b656a38bca1..23ca0e26cbbe171b43f1c7c5d0b139e94ce38ce7 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -37,14 +37,9 @@ #include "fiber.h" #include "coio_buf.h" #include "recovery.h" -#include "tarantool.h" -#include "scoped_guard.h" -#include "iproto.h" #include "iproto_constants.h" #include "msgpuck/msgpuck.h" #include "box/cluster.h" -#include "scramble.h" -#include "third_party/base64.h" static void remote_read_row(struct ev_io *coio, struct iobuf *iobuf, @@ -116,39 +111,6 @@ remote_read_row_fd(int sock, struct iproto_header *row) iproto_header_decode(row, &data, data + len); } -static int -request_encode_auth(const char *greeting, const char *login, - const char *password, struct iovec *iov) -{ - uint32_t login_len = strlen(login); - uint32_t password_len = strlen(password); - - enum { PACKET_LEN_MAX = 128 }; - size_t buf_size = PACKET_LEN_MAX + login_len + SCRAMBLE_SIZE; - char *buf = (char *) region_alloc(&fiber()->gc, buf_size); - - char salt[SCRAMBLE_SIZE]; - char scramble[SCRAMBLE_SIZE]; - if (base64_decode(greeting + 64, SCRAMBLE_BASE64_SIZE, salt, - SCRAMBLE_SIZE) != SCRAMBLE_SIZE) - panic("invalid salt: %64s", greeting + 64); - scramble_prepare(scramble, salt, password, password_len); - - char *d = buf; - d = mp_encode_map(d, 2); - d = mp_encode_uint(d, IPROTO_USER_NAME); - d = mp_encode_str(d, login, login_len); - d = mp_encode_uint(d, IPROTO_TUPLE); - d = mp_encode_array(d, 2); - d = mp_encode_str(d, "chap-sha1", strlen("chap-sha1")); - d = mp_encode_str(d, scramble, SCRAMBLE_SIZE); - - assert(d <= buf + buf_size); - iov[0].iov_base = buf; - iov[0].iov_len = (d - buf); - return 1; -} - void replica_bootstrap(struct recovery_state *r) { @@ -173,12 +135,8 @@ replica_bootstrap(struct recovery_state *r) if (*r->remote.uri.login) { /* Authenticate */ - memset(&row, sizeof(row), 0); - row.type = IPROTO_AUTH; - row.bodycnt = - request_encode_auth(greeting, r->remote.uri.login, - r->remote.uri.password, - row.body); + iproto_encode_auth(&row, greeting, r->remote.uri.login, + r->remote.uri.password); int iovcnt = iproto_row_encode(&row, iov); sio_writev_all(master, iov, iovcnt); remote_read_row_fd(master, &row); @@ -188,21 +146,8 @@ replica_bootstrap(struct recovery_state *r) } /* Send JOIN request */ - memset(&row, 0, sizeof(struct iproto_header)); - row.type = IPROTO_JOIN; + iproto_encode_join(&row, &recovery_state->server_uuid); row.sync = sync; - - char buf[128]; - char *data = buf; - data = mp_encode_map(data, 1); - data = mp_encode_uint(data, IPROTO_SERVER_UUID); - /* Greet the remote server with our server UUID */ - data = iproto_encode_uuid(data, &recovery_state->server_uuid); - - assert(data <= buf + sizeof(buf)); - row.body[0].iov_base = buf; - row.body[0].iov_len = (data - buf); - row.bodycnt = 1; int iovcnt = iproto_row_encode(&row, iov); sio_writev_all(master, iov, iovcnt); @@ -242,6 +187,8 @@ remote_connect(struct recovery_state *r, struct ev_io *coio, struct iobuf *iobuf, const char **err) { char greeting[IPROTO_GREETING_SIZE]; + struct iovec iov[IPROTO_ROW_IOVMAX]; + evio_socket(coio, AF_INET, SOCK_STREAM, IPPROTO_TCP); struct port_uri *uri = &r->remote.uri; @@ -254,11 +201,7 @@ remote_connect(struct recovery_state *r, struct ev_io *coio, /* Authenticate */ say_debug("authenticating..."); struct iproto_header row; - memset(&row, sizeof(row), 0); - row.type = IPROTO_AUTH; - row.bodycnt = request_encode_auth(greeting, uri->login, - uri->password, row.body); - struct iovec iov[IPROTO_ROW_IOVMAX]; + iproto_encode_auth(&row, greeting, uri->login, uri->password); int iovcnt = iproto_row_encode(&row, iov); coio_writev(coio, iov, iovcnt, 0); remote_read_row(coio, iobuf, &row); @@ -269,30 +212,7 @@ remote_connect(struct recovery_state *r, struct ev_io *coio, /* Send SUBSCRIBE request */ struct iproto_header row; - memset(&row, 0, sizeof(row)); - row.type = IPROTO_SUBSCRIBE; - - uint32_t cluster_size = vclock_size(&r->vclock); - size_t size = 128 + cluster_size * - (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX)); - char *buf = (char *) region_alloc(&fiber()->gc, size); - char *data = buf; - data = mp_encode_map(data, 3); - data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); - data = iproto_encode_uuid(data, &cluster_id); - data = mp_encode_uint(data, IPROTO_SERVER_UUID); - data = iproto_encode_uuid(data, &recovery_state->server_uuid); - data = mp_encode_uint(data, IPROTO_VCLOCK); - data = mp_encode_map(data, cluster_size); - vclock_foreach(&r->vclock, server) { - data = mp_encode_uint(data, server.id); - data = mp_encode_uint(data, server.lsn); - } - assert(data <= buf + size); - row.body[0].iov_base = buf; - row.body[0].iov_len = (data - buf); - row.bodycnt = 1; - struct iovec iov[IPROTO_ROW_IOVMAX]; + iproto_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock); int iovcnt = iproto_row_encode(&row, iov); coio_writev(coio, iov, iovcnt, 0); diff --git a/src/box/replication.cc b/src/box/replication.cc index 856e802ce818b02fc90eafe2b2c6f1d63e874016..81c8300b78790e7b05e3cd1767f06939ef0cdb82 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -46,8 +46,6 @@ #include "log_io.h" #include "evio.h" #include "iproto_constants.h" -#include "msgpuck/msgpuck.h" -#include "scoped_guard.h" #include "box/cluster.h" #include "box/schema.h" #include "box/vclock.h" @@ -660,31 +658,11 @@ replication_relay_join(struct recovery_state *r) recover_snap(r); /* Send response to JOIN command = end of stream */ - struct iproto_header packet; - memset(&packet, 0, sizeof(packet)); - packet.type = IPROTO_JOIN; - packet.sync = relay.sync; - - /* Add vclock to response body */ - uint32_t cluster_size = vclock_size(&r->vclock); - size_t size = 128 + cluster_size * - (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX)); - char *buf = (char *) region_alloc(&fiber()->gc, size); - char *data = buf; - data = mp_encode_map(data, 1); - data = mp_encode_uint(data, IPROTO_VCLOCK); - data = mp_encode_map(data, cluster_size); - vclock_foreach(&r->vclock, server) { - data = mp_encode_uint(data, server.id); - data = mp_encode_uint(data, server.lsn); - } - assert(data <= buf + size); - packet.body[0].iov_base = buf; - packet.body[0].iov_len = (data - buf); - packet.bodycnt = 1; - + struct iproto_header row; + iproto_encode_eos(&row, &r->vclock); + row.sync = relay.sync; struct iovec iov[IPROTO_ROW_IOVMAX]; - int iovcnt = iproto_row_encode(&packet, iov); + int iovcnt = iproto_row_encode(&row, iov); sio_writev_all(relay.sock, iov, iovcnt); say_info("snapshot sent"); diff --git a/src/iproto_constants.cc b/src/iproto_constants.cc index 90aa071f61f0924df0bd1c5f355f161745db9790..4ae0754acb0d871092acc4c30d61f3e95fd1cff8 100644 --- a/src/iproto_constants.cc +++ b/src/iproto_constants.cc @@ -33,6 +33,8 @@ #include "crc32.h" #include "tt_uuid.h" #include "box/vclock.h" +#include "scramble.h" +#include "third_party/base64.h" const unsigned char iproto_key_type[IPROTO_KEY_MAX] = { @@ -316,6 +318,42 @@ iproto_row_encode(const struct iproto_header *row, return iovcnt; } +void +iproto_encode_auth(struct iproto_header *packet, const char *greeting, + const char *login, const char *password) +{ + memset(packet, sizeof(*packet), 0); + + uint32_t login_len = strlen(login); + uint32_t password_len = strlen(password); + + enum { PACKET_LEN_MAX = 128 }; + size_t buf_size = PACKET_LEN_MAX + login_len + SCRAMBLE_SIZE; + char *buf = (char *) region_alloc(&fiber()->gc, buf_size); + + char salt[SCRAMBLE_SIZE]; + char scramble[SCRAMBLE_SIZE]; + if (base64_decode(greeting + 64, SCRAMBLE_BASE64_SIZE, salt, + SCRAMBLE_SIZE) != SCRAMBLE_SIZE) + panic("invalid salt: %64s", greeting + 64); + scramble_prepare(scramble, salt, password, password_len); + + char *d = buf; + d = mp_encode_map(d, 2); + d = mp_encode_uint(d, IPROTO_USER_NAME); + d = mp_encode_str(d, login, login_len); + d = mp_encode_uint(d, IPROTO_TUPLE); + d = mp_encode_array(d, 2); + d = mp_encode_str(d, "chap-sha1", strlen("chap-sha1")); + d = mp_encode_str(d, scramble, SCRAMBLE_SIZE); + + assert(d <= buf + buf_size); + packet->body[0].iov_base = buf; + packet->body[0].iov_len = (d - buf); + packet->bodycnt = 1; + packet->type = IPROTO_AUTH; +} + void iproto_decode_error(struct iproto_header *row) { @@ -358,15 +396,44 @@ iproto_decode_error(struct iproto_header *row) } void -iproto_decode_subscribe(struct iproto_header *packet, - struct tt_uuid *cluster_uuid, +iproto_encode_subscribe(struct iproto_header *row, + const struct tt_uuid *cluster_uuid, + const struct tt_uuid *server_uuid, + const struct vclock *vclock) +{ + memset(row, 0, sizeof(*row)); + uint32_t cluster_size = vclock_size(vclock); + size_t size = 128 + cluster_size * + (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX)); + char *buf = (char *) region_alloc(&fiber()->gc, size); + char *data = buf; + data = mp_encode_map(data, 3); + data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); + data = iproto_encode_uuid(data, cluster_uuid); + data = mp_encode_uint(data, IPROTO_SERVER_UUID); + data = iproto_encode_uuid(data, server_uuid); + data = mp_encode_uint(data, IPROTO_VCLOCK); + data = mp_encode_map(data, cluster_size); + vclock_foreach(vclock, server) { + data = mp_encode_uint(data, server.id); + data = mp_encode_uint(data, server.lsn); + } + assert(data <= buf + size); + row->body[0].iov_base = buf; + row->body[0].iov_len = (data - buf); + row->bodycnt = 1; + row->type = IPROTO_SUBSCRIBE; +} + +void +iproto_decode_subscribe(struct iproto_header *row, struct tt_uuid *cluster_uuid, struct tt_uuid *server_uuid, struct vclock *vclock) { - if (packet->bodycnt == 0) + if (row->bodycnt == 0) tnt_raise(ClientError, ER_INVALID_MSGPACK, "request body"); - assert(packet->bodycnt == 1); - const char *data = (const char *) packet->body[0].iov_base; - const char *end = data + packet->body[0].iov_len; + assert(row->bodycnt == 1); + const char *data = (const char *) row->body[0].iov_base; + const char *end = data + row->body[0].iov_len; const char *d = data; if (mp_check(&d, end) != 0 || mp_typeof(*data) != MP_MAP) tnt_raise(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -425,3 +492,48 @@ iproto_decode_subscribe(struct iproto_header *packet, vclock_follow(vclock, id, lsn); } } + +void +iproto_encode_join(struct iproto_header *row, const struct tt_uuid *server_uuid) +{ + memset(row, 0, sizeof(*row)); + + size_t size = 64; + char *buf = (char *) region_alloc(&fiber()->gc, size); + char *data = buf; + data = mp_encode_map(data, 1); + data = mp_encode_uint(data, IPROTO_SERVER_UUID); + /* Greet the remote server with our server UUID */ + data = iproto_encode_uuid(data, server_uuid); + assert(data <= buf + size); + + row->body[0].iov_base = buf; + row->body[0].iov_len = (data - buf); + row->bodycnt = 1; + row->type = IPROTO_JOIN; +} + +void +iproto_encode_eos(struct iproto_header *row, const struct vclock *vclock) +{ + memset(row, 0, sizeof(*row)); + + /* Add vclock to response body */ + uint32_t cluster_size = vclock_size(vclock); + size_t size = 8 + cluster_size * + (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX)); + char *buf = (char *) region_alloc(&fiber()->gc, size); + char *data = buf; + data = mp_encode_map(data, 1); + data = mp_encode_uint(data, IPROTO_VCLOCK); + data = mp_encode_map(data, cluster_size); + vclock_foreach(vclock, server) { + data = mp_encode_uint(data, server.id); + data = mp_encode_uint(data, server.lsn); + } + assert(data <= buf + size); + row->body[0].iov_base = buf; + row->body[0].iov_len = (data - buf); + row->bodycnt = 1; + row->type = IPROTO_JOIN; +} diff --git a/src/iproto_constants.h b/src/iproto_constants.h index d8fa8e6eb5a039d08fd892a45917027525a7303c..f768cde68654a987c378e96c684adec113f4d5f0 100644 --- a/src/iproto_constants.h +++ b/src/iproto_constants.h @@ -184,41 +184,84 @@ iproto_header_encode(const struct iproto_header *header, int iproto_row_encode(const struct iproto_header *row, struct iovec *out); +/** + * \brief Decode ERROR and re-throw it as ClientError exception + * \param row + */ void iproto_decode_error(struct iproto_header *row); +/** + * \brief Encode AUTH command + * \param[out] row + * \param greeting - IPROTO greeting + * \param login - user login + * \param password - user password + */ +void +iproto_encode_auth(struct iproto_header *row, const char *greeting, + const char *login, const char *password); + +/** + * \brief Encode SUBSCRIBE command + * \param row[out] + * \param cluster_uuid cluster uuid + * \param server_uuid server uuid + * \param vclock server vclock + */ +void +iproto_encode_subscribe(struct iproto_header *row, + const struct tt_uuid *cluster_uuid, + const struct tt_uuid *server_uuid, + const struct vclock *vclock); + /** * \brief Decode SUBSCRIBE command - * \param packet + * \param row * \param[out] cluster_uuid * \param[out] server_uuid * \param[out] vclock */ void -iproto_decode_subscribe(struct iproto_header *packet, - struct tt_uuid *cluster_uuid, +iproto_decode_subscribe(struct iproto_header *row, struct tt_uuid *cluster_uuid, struct tt_uuid *server_uuid, struct vclock *vclock); +/** + * \brief Encode JOIN command + * \param[out] row + * \param server_uuid + */ +void +iproto_encode_join(struct iproto_header *row, const struct tt_uuid *server_uuid); + /** * \brief Decode JOIN command - * \param packet + * \param row * \param[out] server_uuid */ static inline void -iproto_decode_join(struct iproto_header *packet, struct tt_uuid *server_uuid) +iproto_decode_join(struct iproto_header *row, struct tt_uuid *server_uuid) { - return iproto_decode_subscribe(packet, NULL, server_uuid, NULL); + return iproto_decode_subscribe(row, NULL, server_uuid, NULL); } /** - * \brief Decode end of stream packet (a response to JOIN packet) - * \param packet + * \brief Encode end of stream command (a response to JOIN command) + * \param row[out] + * \param vclock + */ +void +iproto_encode_eos(struct iproto_header *row, const struct vclock *vclock); + +/** + * \brief Decode end of stream command (a response to JOIN command) + * \param row * \param[out] vclock */ static inline void -iproto_decode_eos(struct iproto_header *packet, struct vclock *vclock) +iproto_decode_eos(struct iproto_header *row, struct vclock *vclock) { - return iproto_decode_subscribe(packet, NULL, NULL, vclock); + return iproto_decode_subscribe(row, NULL, NULL, vclock); } #if defined(__cplusplus)