diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 246749e2e5f23b6353d5620f9d108cfd2a32038d..93910008f6bc7e659e799d68de95322a48eacdec 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -407,9 +407,16 @@ iproto_msg_new(struct iproto_connection *con); static void iproto_resume(struct iproto_thread *iproto_thread); +/** + * Prepares IPROTO message: decodes the message header, checks the message's + * stream identifier, and set's the message's cbus route. + * If the message contains a replication request, sets the stop input flag, + * which means the connection's buffers needs to be detached from the event + * loop. + */ static void -iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, - bool *stop_input); +iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend, + bool *stop_input); enum rmean_net_name { IPROTO_SENT, @@ -1155,7 +1162,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) msg->len = reqend - reqstart; /* total request length */ - iproto_msg_decode(msg, &pos, reqend, &stop_input); + iproto_msg_prepare(msg, &pos, reqend, &stop_input); int rc = iproto_msg_start_processing_in_stream(msg); if (rc < 0) { @@ -1532,17 +1539,28 @@ net_end_join(struct cmsg *msg); static void net_end_subscribe(struct cmsg *msg); +/** + * Decodes the IPROTO message and returns the route corresponding to the message + * type. + * Can be called from both IPROTO and TX threads. + */ +static int +iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route); + static void -iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, - bool *stop_input) +iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend, + bool *stop_input) { uint64_t stream_id; uint8_t type; bool request_is_not_for_stream; bool request_is_only_for_stream; + bool is_replication_request; struct iproto_thread *iproto_thread = msg->connection->iproto_thread; + struct cmsg_hop *route; + int rc; - if (xrow_header_decode(&msg->header, pos, reqend, true)) + if (xrow_header_decode(&msg->header, pos, reqend, true) != 0) goto error; assert(*pos == reqend); @@ -1566,11 +1584,34 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, goto error; } - /* - * Parse request before putting it into the queue - * to save tx some CPU. More complicated requests are - * parsed in tx thread into request type-specific objects. - */ + is_replication_request = type == IPROTO_JOIN || + type == IPROTO_FETCH_SNAPSHOT || + type == IPROTO_REGISTER || + type == IPROTO_SUBSCRIBE; + if (is_replication_request) + *stop_input = true; + + rc = iproto_msg_decode(msg, &route); + if (rc == 0) { + assert(route != NULL); + cmsg_init(&msg->base, route); + return; + } + if (route == NULL) + diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t)type); +error: + /** Log and send the error. */ + diag_log(); + diag_create(&msg->diag); + diag_move(&fiber()->diag, &msg->diag); + cmsg_init(&msg->base, iproto_thread->error_route); +} + +static int +iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route) +{ + uint8_t type = msg->header.type; + struct iproto_thread *iproto_thread = msg->connection->iproto_thread; switch (type) { case IPROTO_SELECT: case IPROTO_INSERT: @@ -1578,98 +1619,86 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, case IPROTO_UPDATE: case IPROTO_DELETE: case IPROTO_UPSERT: + assert(type < sizeof(iproto_thread->dml_route) / + sizeof(*iproto_thread->dml_route)); + *route = iproto_thread->dml_route[type]; if (xrow_decode_dml(&msg->header, &msg->dml, dml_request_key_map(type))) - goto error; + return -1; /* * In contrast to replication requests, for a client request * the xrow header is set by WAL, which generates LSNs and sets * replica id. Ignore the header received over network. */ msg->dml.header = NULL; - assert(type < sizeof(iproto_thread->dml_route) / - sizeof(*(iproto_thread->dml_route))); - cmsg_init(&msg->base, iproto_thread->dml_route[type]); - break; + return 0; case IPROTO_BEGIN: + *route = iproto_thread->begin_route; if (xrow_decode_begin(&msg->header, &msg->begin) != 0) - goto error; - cmsg_init(&msg->base, iproto_thread->begin_route); - break; + return -1; + return 0; case IPROTO_COMMIT: - cmsg_init(&msg->base, iproto_thread->commit_route); - break; + *route = iproto_thread->commit_route; + return 0; case IPROTO_ROLLBACK: - cmsg_init(&msg->base, iproto_thread->rollback_route); - break; + *route = iproto_thread->rollback_route; + return 0; case IPROTO_CALL_16: case IPROTO_CALL: case IPROTO_EVAL: + *route = iproto_thread->call_route; if (xrow_decode_call(&msg->header, &msg->call)) - goto error; - cmsg_init(&msg->base, iproto_thread->call_route); - break; + return -1; + return 0; case IPROTO_WATCH: case IPROTO_UNWATCH: + *route = iproto_thread->misc_route; ERROR_INJECT(ERRINJ_IPROTO_DISABLE_WATCH, { - diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t)type); - goto error; + *route = NULL; + return -1; }); if (xrow_decode_watch(&msg->header, &msg->watch) != 0) - goto error; - cmsg_init(&msg->base, iproto_thread->misc_route); - break; + return -1; + return 0; case IPROTO_EXECUTE: case IPROTO_PREPARE: + *route = iproto_thread->sql_route; if (xrow_decode_sql(&msg->header, &msg->sql) != 0) - goto error; - cmsg_init(&msg->base, iproto_thread->sql_route); - break; + return -1; + return 0; case IPROTO_PING: - cmsg_init(&msg->base, iproto_thread->misc_route); - break; + *route = iproto_thread->misc_route; + return 0; case IPROTO_ID: + *route = iproto_thread->misc_route; ERROR_INJECT(ERRINJ_IPROTO_DISABLE_ID, { - diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t)type); - goto error; + *route = NULL; + return -1; }); if (xrow_decode_id(&msg->header, &msg->id) != 0) - goto error; - cmsg_init(&msg->base, iproto_thread->misc_route); - break; + return -1; + return 0; case IPROTO_JOIN: case IPROTO_FETCH_SNAPSHOT: case IPROTO_REGISTER: - cmsg_init(&msg->base, iproto_thread->join_route); - *stop_input = true; - break; + *route = iproto_thread->join_route; + return 0; case IPROTO_SUBSCRIBE: - cmsg_init(&msg->base, iproto_thread->subscribe_route); - *stop_input = true; - break; + *route = iproto_thread->subscribe_route; + return 0; case IPROTO_VOTE_DEPRECATED: case IPROTO_VOTE: - cmsg_init(&msg->base, iproto_thread->misc_route); - break; + *route = iproto_thread->misc_route; + return 0; case IPROTO_AUTH: + *route = iproto_thread->misc_route; if (xrow_decode_auth(&msg->header, &msg->auth)) - goto error; - cmsg_init(&msg->base, iproto_thread->misc_route); - break; + return -1; + return 0; default: - diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t) type); - goto error; + *route = NULL; + return -1; } - return; -error: - /** Log and send the error. */ - diag_log(); - diag_create(&msg->diag); - diag_move(&fiber()->diag, &msg->diag); - cmsg_init(&msg->base, iproto_thread->error_route); } static void