diff --git a/src/box/applier.cc b/src/box/applier.cc index 0fc42cc1e5ea3e0d66f1d63e4a91b10080a35294..0b674d0e862e8991b44986289d583252a8efa6c9 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -175,14 +175,13 @@ applier_connect(struct applier *applier) /* Authenticate */ applier_set_state(applier, APPLIER_AUTH); struct xrow_header row; - xrow_encode_auth(&row, greeting.salt, greeting.salt_len, uri->login, - uri->login_len, uri->password, - uri->password_len); + xrow_encode_auth_xc(&row, greeting.salt, greeting.salt_len, uri->login, + uri->login_len, uri->password, uri->password_len); coio_write_xrow(coio, &row); coio_read_xrow(coio, &iobuf->in, &row); applier->last_row_time = ev_now(loop()); if (row.type != IPROTO_OK) - xrow_decode_error(&row); /* auth failed */ + xrow_decode_error_xc(&row); /* auth failed */ done: /* auth succeeded */ @@ -200,7 +199,7 @@ applier_join(struct applier *applier) struct ev_io *coio = &applier->io; struct iobuf *iobuf = applier->iobuf; struct xrow_header row; - xrow_encode_join(&row, &INSTANCE_UUID); + xrow_encode_join_xc(&row, &INSTANCE_UUID); coio_write_xrow(coio, &row); /** @@ -211,7 +210,7 @@ applier_join(struct applier *applier) /* Decode JOIN response */ coio_read_xrow(coio, &iobuf->in, &row); if (iproto_type_is_error(row.type)) { - return xrow_decode_error(&row); /* re-throw error */ + xrow_decode_error_xc(&row); /* re-throw error */ } else if (row.type != IPROTO_OK) { tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row.type); @@ -249,7 +248,7 @@ applier_join(struct applier *applier) } break; /* end of stream */ } else if (iproto_type_is_error(row.type)) { - xrow_decode_error(&row); /* rethrow error */ + xrow_decode_error_xc(&row); /* rethrow error */ } else { tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row.type); @@ -282,7 +281,7 @@ applier_join(struct applier *applier) */ break; /* end of stream */ } else if (iproto_type_is_error(row.type)) { - xrow_decode_error(&row); /* rethrow error */ + xrow_decode_error_xc(&row); /* rethrow error */ } else { tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row.type); @@ -308,8 +307,8 @@ applier_subscribe(struct applier *applier) struct iobuf *iobuf = applier->iobuf; struct xrow_header row; - xrow_encode_subscribe(&row, &REPLICASET_UUID, &INSTANCE_UUID, - &replicaset_vclock); + xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, + &replicaset_vclock); coio_write_xrow(coio, &row); applier_set_state(applier, APPLIER_FOLLOW); @@ -319,7 +318,7 @@ applier_subscribe(struct applier *applier) if (applier->version_id >= version_id(1, 6, 7)) { coio_read_xrow(coio, &iobuf->in, &row); if (iproto_type_is_error(row.type)) { - return xrow_decode_error(&row); /* error */ + xrow_decode_error_xc(&row); /* error */ } else if (row.type != IPROTO_OK) { tnt_raise(ClientError, ER_PROTOCOL, "Invalid response to SUBSCRIBE"); @@ -352,7 +351,7 @@ applier_subscribe(struct applier *applier) applier->last_row_time = ev_now(loop()); if (iproto_type_is_error(row.type)) - xrow_decode_error(&row); /* error */ + xrow_decode_error_xc(&row); /* error */ /* Replication request. */ if (row.replica_id == REPLICA_ID_NIL || row.replica_id >= VCLOCK_MAX) { diff --git a/src/box/box.cc b/src/box/box.cc index a29436d4635fb399be10d89bd093186d7a818ba0..2231a2a5817e9a96f56d2f00807a17d7b4aa1ead 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -288,7 +288,7 @@ apply_row(struct xstream *stream, struct xrow_header *row) { assert(row->bodycnt == 1); /* always 1 for read */ (void) stream; - struct request *request = xrow_decode_request(row); + struct request *request = xrow_decode_request_xc(row); struct space *space = space_cache_find(request->space_id); process_rw(request, space, NULL); } @@ -326,7 +326,7 @@ static void apply_initial_join_row(struct xstream *stream, struct xrow_header *row) { (void) stream; - struct request *request = xrow_decode_request(row); + struct request *request = xrow_decode_request_xc(row); struct space *space = space_cache_find(request->space_id); /* no access checks here - applier always works with admin privs */ space->handler->applyInitialJoinRow(space, request); @@ -1106,7 +1106,7 @@ box_process_auth(struct request *request, struct obuf *out) uint32_t len = mp_decode_strl(&user); authenticate(user, len, request->tuple, request->tuple_end); assert(request->header != NULL); - iproto_reply_ok(out, request->header->sync, ::schema_version); + iproto_reply_ok_xc(out, request->header->sync, ::schema_version); } void @@ -1203,7 +1203,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Respond to JOIN request with start_vclock. */ struct xrow_header row; - xrow_encode_vclock(&row, &start_vclock); + xrow_encode_vclock_xc(&row, &start_vclock); row.sync = header->sync; coio_write_xrow(io, &row); @@ -1231,7 +1231,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) wal_checkpoint(&stop_vclock, false); /* Send end of initial stage data marker */ - xrow_encode_vclock(&row, &stop_vclock); + xrow_encode_vclock_xc(&row, &stop_vclock); row.sync = header->sync; coio_write_xrow(io, &row); @@ -1245,7 +1245,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Send end of WAL stream marker */ struct vclock current_vclock; wal_checkpoint(¤t_vclock, false); - xrow_encode_vclock(&row, ¤t_vclock); + xrow_encode_vclock_xc(&row, ¤t_vclock); row.sync = header->sync; coio_write_xrow(io, &row); } @@ -1262,8 +1262,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) struct tt_uuid replicaset_uuid = uuid_nil, replica_uuid = uuid_nil; struct vclock replica_clock; vclock_create(&replica_clock); - xrow_decode_subscribe(header, &replicaset_uuid, &replica_uuid, - &replica_clock); + xrow_decode_subscribe_xc(header, &replicaset_uuid, &replica_uuid, + &replica_clock); /* Forbid connection to itself */ if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) @@ -1306,7 +1306,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) struct xrow_header row; struct vclock current_vclock; wal_checkpoint(¤t_vclock, true); - xrow_encode_vclock(&row, ¤t_vclock); + xrow_encode_vclock_xc(&row, ¤t_vclock); /* * Identify the message with the replica id of this * instance, this is the only way for a replica to find diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 60ac613dbb41bb0ed102b1321dd8f9bd3a9fec59..ce6372693f8b51aa7a529dfa8cc33cf67f742b4c 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -966,8 +966,8 @@ tx_process_misc(struct cmsg *m) box_process_auth(&msg->request, out); break; case IPROTO_PING: - iproto_reply_ok(out, msg->header.sync, - ::schema_version); + iproto_reply_ok_xc(out, msg->header.sync, + ::schema_version); break; default: unreachable(); diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index bd45a357aedf081b19616284c6f73c8b1300d77b..0f8e8774520456d06a4f19a4238dc9110c1caa44 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -198,7 +198,7 @@ MemtxEngine::recoverSnapshotRow(struct xrow_header *row) (uint32_t) row->type); } - struct request *request = xrow_decode_request(row); + struct request *request = xrow_decode_request_xc(row); struct space *space = space_cache_find(request->space_id); /* memtx snapshot must contain only memtx spaces */ if (space->handler->engine != this) diff --git a/src/box/xlog.cc b/src/box/xlog.cc index 951262ed19def7a68b8a78aa8963d6bb9079c3db..8075972fd2387685a157be4207475f1ed603b7ea 100644 --- a/src/box/xlog.cc +++ b/src/box/xlog.cc @@ -1176,11 +1176,15 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet) } } + struct obuf_svp svp = obuf_create_svp(&log->obuf); size_t page_offset = obuf_size(&log->obuf); /** encode row into iovec */ struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_header_encode(packet, iov, 0); - struct obuf_svp svp = obuf_create_svp(&log->obuf); + if (iovcnt < 0) { + obuf_rollback_to_svp(&log->obuf, &svp); + return -1; + } for (int i = 0; i < iovcnt; ++i) { struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL, ERRINJ_INT); diff --git a/src/box/xrow.cc b/src/box/xrow.cc index 483fca55a842ddced2b9cf491c9d92549a19df88..1d24097b17963f8cae5a1a0f6cbcbf10ffeac90f 100644 --- a/src/box/xrow.cc +++ b/src/box/xrow.cc @@ -113,17 +113,19 @@ xrow_header_decode(struct xrow_header *header, const char **pos, /** * @pre pos points at a valid msgpack */ -void +static inline int xrow_decode_uuid(const char **pos, struct tt_uuid *out) { if (mp_typeof(**pos) != MP_STR) { error: - tnt_raise(ClientError, ER_INVALID_MSGPACK, "UUID"); + diag_set(ClientError, ER_INVALID_MSGPACK, "UUID"); + return -1; } uint32_t len = mp_decode_strl(pos); if (tt_uuid_from_strl(*pos, len, out) != 0) goto error; *pos += len; + return 0; } int @@ -183,7 +185,7 @@ xrow_header_encode(const struct xrow_header *header, struct iovec *out, return 1 + header->bodycnt; /* new iovcnt */ } -char * +static inline char * xrow_encode_uuid(char *pos, const struct tt_uuid *in) { return mp_encode_str(pos, tt_uuid_str(in), UUID_STR_LEN); @@ -251,12 +253,18 @@ iproto_encode_error(uint32_t error) return error | IPROTO_TYPE_ERROR; } -void +int iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version) { - char *buf = (char *)obuf_alloc_xc(out, IPROTO_HEADER_LEN + 1); + char *buf = (char *)obuf_alloc(out, IPROTO_HEADER_LEN + 1); + if (buf == NULL) { + diag_set(OutOfMemory, IPROTO_HEADER_LEN + 1, "obuf_alloc", + "buf"); + return -1; + } iproto_header_encode(buf, IPROTO_OK, sync, schema_version, 1); buf[IPROTO_HEADER_LEN] = 0x80; /* empty MessagePack Map */ + return 0; } int @@ -264,7 +272,7 @@ iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync, uint32_t schema_version) { uint32_t msg_len = strlen(e->errmsg); - uint32_t errcode = ClientError::get_errcode(e); + uint32_t errcode = box_error_code(e); struct iproto_body_bin body = iproto_error_bin; char *header = (char *)obuf_alloc(out, IPROTO_HEADER_LEN); @@ -284,7 +292,7 @@ void iproto_write_error(int fd, const struct error *e, uint32_t schema_version) { uint32_t msg_len = strlen(e->errmsg); - uint32_t errcode = ClientError::get_errcode(e); + uint32_t errcode = box_error_code(e); char header[IPROTO_HEADER_LEN]; struct iproto_body_bin body = iproto_error_bin; @@ -360,7 +368,7 @@ request_decode(struct request *request, const char *data, uint32_t len, if (mp_typeof(*data) != MP_MAP || mp_check_map(data, end) > 0) { error: - tnt_error(ClientError, ER_INVALID_MSGPACK, "packet body"); + diag_set(ClientError, ER_INVALID_MSGPACK, "packet body"); return -1; } uint32_t size = mp_decode_map(&data); @@ -417,14 +425,14 @@ request_decode(struct request *request, const char *data, uint32_t len, } #ifndef NDEBUG if (data != end) { - tnt_error(ClientError, ER_INVALID_MSGPACK, "packet end"); + diag_set(ClientError, ER_INVALID_MSGPACK, "packet end"); return -1; } #endif if (key_map) { enum iproto_key key = (enum iproto_key) bit_ctz_u64(key_map); - tnt_error(ClientError, ER_MISSING_REQUEST_FIELD, - iproto_key_name(key)); + diag_set(ClientError, ER_MISSING_REQUEST_FIELD, + iproto_key_name(key)); return -1; } return 0; @@ -438,7 +446,11 @@ request_encode(struct request *request, struct iovec *iov) uint32_t key_len = request->key_end - request->key; uint32_t ops_len = request->ops_end - request->ops; uint32_t len = MAP_LEN_MAX + key_len + ops_len; - char *begin = (char *) region_alloc_xc(&fiber()->gc, len); + char *begin = (char *) region_alloc(&fiber()->gc, len); + if (begin == NULL) { + diag_set(OutOfMemory, len, "region_alloc", "begin"); + return -1; + } char *pos = begin + 1; /* skip 1 byte for MP_MAP */ int map_size = 0; if (request->space_id) { @@ -487,12 +499,21 @@ request_encode(struct request *request, struct iovec *iov) struct request * xrow_decode_request(struct xrow_header *row) { - struct request *request; - request = region_alloc_object_xc(&fiber()->gc, struct request); + struct region *region = &fiber()->gc; + size_t used = region_used(region); + struct request *request = region_alloc_object(region, struct request); + if (request == NULL) { + diag_set(OutOfMemory, sizeof(*request), "region_alloc_object", + "request"); + return NULL; + } request_create(request, row->type); - request_decode_xc(request, (const char *) row->body[0].iov_base, - row->body[0].iov_len, - request_key_map(row->type)); + if (request_decode(request, (const char *) row->body[0].iov_base, + row->body[0].iov_len, + request_key_map(row->type)) != 0) { + region_truncate(region, used); + return NULL; + } request->header = row; return request; } @@ -500,9 +521,11 @@ xrow_decode_request(struct xrow_header *row) int xrow_to_iovec(const struct xrow_header *row, struct iovec *out) { - static const int iov0_len = mp_sizeof_uint(UINT32_MAX); - int iovcnt = xrow_header_encode_xc(row, out, iov0_len); - ssize_t len = -iov0_len; + assert(mp_sizeof_uint(UINT32_MAX) == 5); + int iovcnt = xrow_header_encode(row, out, 5); + if (iovcnt < 0) + return -1; + ssize_t len = -5; for (int i = 0; i < iovcnt; i++) len += out[i].iov_len; @@ -515,7 +538,7 @@ xrow_to_iovec(const struct xrow_header *row, struct iovec *out) return iovcnt; } -void +int xrow_encode_auth(struct xrow_header *packet, const char *salt, size_t salt_len, const char *login, size_t login_len, const char *password, size_t password_len) @@ -524,7 +547,11 @@ xrow_encode_auth(struct xrow_header *packet, const char *salt, size_t salt_len, memset(packet, 0, sizeof(*packet)); size_t buf_size = XROW_BODY_LEN_MAX + login_len + SCRAMBLE_SIZE; - char *buf = (char *) region_alloc_xc(&fiber()->gc, buf_size); + char *buf = (char *) region_alloc(&fiber()->gc, buf_size); + if (buf == NULL) { + diag_set(OutOfMemory, buf_size, "region_alloc", "buf"); + return -1; + } char *d = buf; d = mp_encode_map(d, password != NULL ? 2 : 1); @@ -546,6 +573,7 @@ xrow_encode_auth(struct xrow_header *packet, const char *salt, size_t salt_len, packet->body[0].iov_len = (d - buf); packet->bodycnt = 1; packet->type = IPROTO_AUTH; + return 0; } void @@ -558,14 +586,14 @@ xrow_decode_error(struct xrow_header *row) uint32_t map_size; if (row->bodycnt == 0) - goto raise; + goto error; pos = (char *) row->body[0].iov_base; if (mp_check(&pos, pos + row->body[0].iov_len)) - goto raise; + goto error; pos = (char *) row->body[0].iov_base; if (mp_typeof(*pos) != MP_MAP) - goto raise; + goto error; map_size = mp_decode_map(&pos); for (uint32_t i = 0; i < map_size; i++) { if (mp_typeof(*pos) != MP_UINT) { @@ -584,12 +612,11 @@ xrow_decode_error(struct xrow_header *row) snprintf(error, sizeof(error), "%.*s", len, str); } -raise: +error: box_error_set(__FILE__, __LINE__, code, error); - diag_raise(); } -void +int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, @@ -599,7 +626,11 @@ xrow_encode_subscribe(struct xrow_header *row, uint32_t replicaset_size = vclock_size(vclock); size_t size = XROW_BODY_LEN_MAX + replicaset_size * (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX)); - char *buf = (char *) region_alloc_xc(&fiber()->gc, size); + char *buf = (char *) region_alloc(&fiber()->gc, size); + if (buf == NULL) { + diag_set(OutOfMemory, size, "region_alloc", "buf"); + return -1; + } char *data = buf; data = mp_encode_map(data, 3); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); @@ -619,20 +650,25 @@ xrow_encode_subscribe(struct xrow_header *row, row->body[0].iov_len = (data - buf); row->bodycnt = 1; row->type = IPROTO_SUBSCRIBE; + return 0; } -void +int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock) { - if (row->bodycnt == 0) - tnt_raise(ClientError, ER_INVALID_MSGPACK, "request body"); + if (row->bodycnt == 0) { + diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); + return -1; + } assert(row->bodycnt == 1); const char *data = (const char *) row->body[0].iov_base; const char *end = data + row->body[0].iov_len; const char *d = data; - if (mp_check(&d, end) != 0 || mp_typeof(*data) != MP_MAP) - tnt_raise(ClientError, ER_INVALID_MSGPACK, "request body"); + if (mp_check(&d, end) != 0 || mp_typeof(*data) != MP_MAP) { + diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); + return -1; + } const char *lsnmap = NULL; d = data; @@ -648,19 +684,22 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, case IPROTO_CLUSTER_UUID: if (replicaset_uuid == NULL) goto skip; - xrow_decode_uuid(&d, replicaset_uuid); + if (xrow_decode_uuid(&d, replicaset_uuid) != 0) + return -1; break; case IPROTO_INSTANCE_UUID: if (instance_uuid == NULL) goto skip; - xrow_decode_uuid(&d, instance_uuid); + if (xrow_decode_uuid(&d, instance_uuid) != 0) + return -1; break; case IPROTO_VCLOCK: if (vclock == NULL) goto skip; if (mp_typeof(*d) != MP_MAP) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "invalid VCLOCK"); + diag_set(ClientError, ER_INVALID_MSGPACK, + "invalid VCLOCK"); + return -1; } lsnmap = d; mp_next(&d); @@ -671,7 +710,7 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } if (lsnmap == NULL) - return; + return 0; /* Check & save LSNMAP */ d = lsnmap; @@ -679,7 +718,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, for (uint32_t i = 0; i < lsnmap_size; i++) { if (mp_typeof(*d) != MP_UINT) { map_error: - tnt_raise(ClientError, ER_INVALID_MSGPACK, "VCLOCK"); + diag_set(ClientError, ER_INVALID_MSGPACK, "VCLOCK"); + return -1; } uint32_t id = mp_decode_uint(&d); if (mp_typeof(*d) != MP_UINT) @@ -688,15 +728,20 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, if (lsn > 0) vclock_follow(vclock, id, lsn); } + return 0; } -void +int xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid) { memset(row, 0, sizeof(*row)); size_t size = 64; - char *buf = (char *) region_alloc_xc(&fiber()->gc, size); + char *buf = (char *) region_alloc(&fiber()->gc, size); + if (buf == NULL) { + diag_set(OutOfMemory, size, "region_alloc", "buf"); + return -1; + } char *data = buf; data = mp_encode_map(data, 1); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -708,9 +753,10 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid) row->body[0].iov_len = (data - buf); row->bodycnt = 1; row->type = IPROTO_JOIN; + return 0; } -void +int xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock) { memset(row, 0, sizeof(*row)); @@ -719,7 +765,11 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock) uint32_t replicaset_size = vclock_size(vclock); size_t size = 8 + replicaset_size * (mp_sizeof_uint(UINT32_MAX) + mp_sizeof_uint(UINT64_MAX)); - char *buf = (char *) region_alloc_xc(&fiber()->gc, size); + char *buf = (char *) region_alloc(&fiber()->gc, size); + if (buf == NULL) { + diag_set(OutOfMemory, size, "region_alloc", "buf"); + return -1; + } char *data = buf; data = mp_encode_map(data, 1); data = mp_encode_uint(data, IPROTO_VCLOCK); @@ -735,11 +785,12 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock) row->body[0].iov_len = (data - buf); row->bodycnt = 1; row->type = IPROTO_OK; + return 0; } void -greeting_encode(char *greetingbuf, uint32_t version_id, const tt_uuid *uuid, - const char *salt, uint32_t salt_len) +greeting_encode(char *greetingbuf, uint32_t version_id, + const struct tt_uuid *uuid, const char *salt, uint32_t salt_len) { int h = IPROTO_GREETING_SIZE / 2; int r = snprintf(greetingbuf, h + 1, "Tarantool %u.%u.%u (Binary) ", diff --git a/src/box/xrow.h b/src/box/xrow.h index e202e270f02d63c07763c32aa45d5d1ba2c07cb7..53e650c6d323715881b41fc4ccf5c944707e3578 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -96,6 +96,87 @@ int xrow_header_decode(struct xrow_header *header, const char **pos, const char *end); +/** + * Decode a request from the @row. + * @param row Xrow. + * @retval not NULL Decoded request. + * + * @retval NULL Memory or binary format error. + */ +struct request * +xrow_decode_request(struct xrow_header *row); + +/** + * Encode AUTH command. + * @param[out] Row. + * @param salt Salt from IPROTO greeting. + * @param salt_len Length of @salt. + * @param login User login. + * @param login_len Length of @login. + * @param password User password. + * @param password_len Length of @password. + * + * @retval 0 Success. + * @retval -1 Memory error. +*/ +int +xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len, + const char *login, size_t login_len, const char *password, + size_t password_len); + +struct vclock; +/** + * Encode SUBSCRIBE command. + * @param[out] Row. + * @param replicaset_uuid Replica set uuid. + * @param instance_uuid Instance uuid. + * @param vclock Replication clock. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +xrow_encode_subscribe(struct xrow_header *row, + const struct tt_uuid *replicaset_uuid, + const struct tt_uuid *instance_uuid, + const struct vclock *vclock); + +/** + * Decode SUBSCRIBE command. + * @param row Row to decode. + * @param[out] replicaset_uuid. + * @param[out] instance_uuid. + * @param[out] vclock. + * + * @retval 0 Success. + * @retval -1 Memory or format error. + */ +int +xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, + struct tt_uuid *instance_uuid, struct vclock *vclock); + +/** + * Encode JOIN command. + * @param[out] row Row to encode into. + * @param instance_uuid. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); + +/** + * Encode end of stream command (a response to JOIN command). + * @param row[out] Row to encode into. + * @param vclock. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); + /** * Fast encode xrow header using the specified header fields. * It is faster than the xrow_header_encode, because uses @@ -132,8 +213,16 @@ void iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, uint32_t schema_version, uint32_t count); -/** Stack a reply to 'ping' packet. */ -void +/* + * Encode iproto header with IPROTO_OK response code. + * @param out Encode to. + * @param sync Request sync. + * @param schema_version. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version); /** @@ -239,10 +328,9 @@ struct greeting { * * \sa greeting_decode() */ - void -greeting_encode(char *greetingbuf, uint32_t version_id, const - struct tt_uuid *uuid, const char *salt, +greeting_encode(char *greetingbuf, uint32_t version_id, + const struct tt_uuid *uuid, const char *salt, uint32_t salt_len); /** @@ -259,6 +347,25 @@ greeting_encode(char *greetingbuf, uint32_t version_id, const int greeting_decode(const char *greetingbuf, struct greeting *greeting); +/** + * Encode an xrow record into the specified iovec. + * + * @param row Record to encode. + * @param[out] out Encoded record. + * + * @retval >= 0 Used iovector components. + * @retval -1 Error. + */ +int +xrow_to_iovec(const struct xrow_header *row, struct iovec *out); + +/** + * Decode ERROR and set it to diagnostics area. + * @param row Encoded error. + */ +void +xrow_decode_error(struct xrow_header *row); + #if defined(__cplusplus) } /* extern "C" */ @@ -273,33 +380,22 @@ xrow_header_decode_xc(struct xrow_header *header, const char **pos, diag_raise(); } -void -xrow_decode_uuid(const char **pos, struct tt_uuid *out); - -char * -xrow_encode_uuid(char *pos, const struct tt_uuid *in); - -int -xrow_to_iovec(const struct xrow_header *row, struct iovec *out); - -/** - * \brief Decode ERROR and re-throw it as ClientError exception - * \param row -*/ -void -xrow_decode_error(struct xrow_header *row); - -/** - * @copydoc xrow_header_encode() - */ +/** @copydoc xrow_to_iovec. */ static inline int -xrow_header_encode_xc(const struct xrow_header *header, - struct iovec *out, size_t fixheader_len) +xrow_to_iovec_xc(const struct xrow_header *row, struct iovec *out) { - int iovcnt = xrow_header_encode(header, out, fixheader_len); - if (iovcnt < 0) + int rc = xrow_to_iovec(row, out); + if (rc < 0) diag_raise(); - return iovcnt; + return rc; +} + +/** @copydoc xrow_decode_error. */ +static inline void +xrow_decode_error_xc(struct xrow_header *row) +{ + xrow_decode_error(row); + diag_raise(); } static inline void @@ -319,55 +415,58 @@ request_encode_xc(struct request *request, struct iovec *iov) return iovcnt; } -struct request * -xrow_decode_request(struct xrow_header *row); +/** @copydoc xrow_decode_request. */ +static inline struct request * +xrow_decode_request_xc(struct xrow_header *row) +{ + struct request *ret = xrow_decode_request(row); + if (ret == NULL) + diag_raise(); + return ret; +} -/** - * \brief Encode AUTH command - * \param[out] row - * \param salt - salt from IPROTO greeting - * \param salt_len length of \a salt - * \param login - user login - * \param login_len - length of \a login - * \param password - user password - * \param password_len - length of \a password -*/ -void -xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len, - const char *login, size_t login_len, - const char *password, size_t password_len); +/** @copydoc xrow_encode_auth. */ +static inline void +xrow_encode_auth_xc(struct xrow_header *row, const char *salt, size_t salt_len, + const char *login, size_t login_len, const char *password, + size_t password_len) +{ + if (xrow_encode_auth(row, salt, salt_len, login, login_len, password, + password_len) != 0) + diag_raise(); +} -/** - * \brief Encode SUBSCRIBE command - * \param row[out] - * \param replicaset_uuid replica set uuid - * \param instance_uuid instance uuid - * \param vclock replication clock -*/ -void -xrow_encode_subscribe(struct xrow_header *row, - const struct tt_uuid *replicaset_uuid, - const struct tt_uuid *instance_uuid, - const struct vclock *vclock); +/** @copydoc xrow_encode_subscribe. */ +static inline void +xrow_encode_subscribe_xc(struct xrow_header *row, + const struct tt_uuid *replicaset_uuid, + const struct tt_uuid *instance_uuid, + const struct vclock *vclock) +{ + if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, + vclock) != 0) + diag_raise(); +} -/** - * \brief Decode SUBSCRIBE command - * \param row - * \param[out] replicaset_uuid - * \param[out] instance_uuid - * \param[out] vclock -*/ -void -xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, - struct tt_uuid *instance_uuid, struct vclock *vclock); +/** @copydoc xrow_decode_subscribe. */ +static inline void +xrow_decode_subscribe_xc(struct xrow_header *row, + struct tt_uuid *replicaset_uuid, + struct tt_uuid *instance_uuid, struct vclock *vclock) +{ + if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, + vclock) != 0) + diag_raise(); +} -/** - * \brief Encode JOIN command - * \param[out] row - * \param instance_uuid -*/ -void -xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); +/** @copydoc xrow_encode_join. */ +static inline void +xrow_encode_join_xc(struct xrow_header *row, + const struct tt_uuid *instance_uuid) +{ + if (xrow_encode_join(row, instance_uuid) != 0) + diag_raise(); +} /** * \brief Decode JOIN command @@ -377,16 +476,16 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); static inline void xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid) { - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL); + xrow_decode_subscribe_xc(row, NULL, instance_uuid, NULL); } -/** - * \brief Encode end of stream command (a response to JOIN command) - * \param row[out] - * \param vclock -*/ -void -xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); +/** @copydoc xrow_encode_vclock. */ +static inline void +xrow_encode_vclock_xc(struct xrow_header *row, const struct vclock *vclock) +{ + if (xrow_encode_vclock(row, vclock) != 0) + diag_raise(); +} /** * \brief Decode end of stream command (a response to JOIN command) @@ -396,7 +495,15 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); static inline void xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, NULL, vclock); + xrow_decode_subscribe_xc(row, NULL, NULL, vclock); +} + +/** @copydoc iproto_reply_ok. */ +static inline void +iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) +{ + if (iproto_reply_ok(out, sync, schema_version) != 0) + diag_raise(); } #endif diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc index 3ee1cbcb025332915bc5944fabebcb3462231f95..f2fc772598dd9a6727170013c95402b7b365c5f5 100644 --- a/src/box/xrow_io.cc +++ b/src/box/xrow_io.cc @@ -65,7 +65,7 @@ void coio_write_xrow(struct ev_io *coio, const struct xrow_header *row) { struct iovec iov[XROW_IOVMAX]; - int iovcnt = xrow_to_iovec(row, iov); + int iovcnt = xrow_to_iovec_xc(row, iov); coio_writev(coio, iov, iovcnt, 0); }