From 18be132906ea969e69f444a095a06b26d59609a4 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Fri, 17 Nov 2017 00:39:48 +0300 Subject: [PATCH] iproto: separate implementation iproto_msg and iproto_connection This file used to be smaller so not having extra forward declarations seemed to be reasonable. Now it's time to split the code of iproto_msg and iproto_connection. No semantical changes. --- src/box/iproto.cc | 475 ++++++++++++++++++++++++---------------------- 1 file changed, 243 insertions(+), 232 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 18a0627118..c65fd4be62 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -138,6 +138,9 @@ iproto_msg_new(struct iproto_connection *con) static void iproto_resume(); +static void +iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, + bool *stop_input); static inline void iproto_msg_delete(struct cmsg *msg) @@ -146,10 +149,6 @@ iproto_msg_delete(struct cmsg *msg) iproto_resume(); } -/* }}} */ - -/* {{{ iproto connection and requests */ - /** * A single global queue for all requests in all connections. All * requests from all connections are processed concurrently. @@ -176,6 +175,21 @@ enum rmean_net_name { const char *rmean_net_strings[IPROTO_LAST] = { "SENT", "RECEIVED" }; +static void +tx_process_disconnect(struct cmsg *m); + +static void +net_finish_disconnect(struct cmsg *m); + +static const struct cmsg_hop disconnect_route[] = { + { tx_process_disconnect, &net_pipe }, + { net_finish_disconnect, NULL }, +}; + +/* }}} */ + +/* {{{ iproto_connection - declaration and definition */ + /** * Context of a single client connection. * Interaction scheme: @@ -342,179 +356,6 @@ iproto_write_error_blocking(int sock, const struct error *e, uint64_t sync) (void) fcntl(sock, F_SETFL, flags); } -static void -iproto_connection_on_input(ev_loop * /* loop */, struct ev_io *watcher, - int /* revents */); -static void -iproto_connection_on_output(ev_loop * /* loop */, struct ev_io *watcher, - int /* revents */); - -/** Recycle a connection. Never throws. */ -static inline void -iproto_connection_delete(struct iproto_connection *con) -{ - assert(iproto_connection_is_idle(con)); - assert(!evio_has_fd(&con->output)); - assert(!evio_has_fd(&con->input)); - assert(con->session == NULL); - /* - * The output buffers must have been deleted - * in tx thread. - */ - ibuf_destroy(&con->ibuf[0]); - ibuf_destroy(&con->ibuf[1]); - assert(con->obuf[0].pos == 0 && - con->obuf[0].iov[0].iov_base == NULL); - assert(con->obuf[1].pos == 0 && - con->obuf[1].iov[0].iov_base == NULL); - if (con->disconnect) - iproto_msg_delete(con->disconnect); - mempool_free(&iproto_connection_pool, con); -} - -static void -tx_process_misc(struct cmsg *msg); -static void -tx_process1(struct cmsg *msg); -static void -tx_process_select(struct cmsg *msg); - -static void -tx_reply_error(struct iproto_msg *msg); - -static void -net_send_msg(struct cmsg *msg); - -static void -tx_process_join_subscribe(struct cmsg *msg); -static void -net_end_join(struct cmsg *msg); -static void -net_end_subscribe(struct cmsg *msg); - -static void -tx_fiber_init(struct session *session, uint64_t sync) -{ - session->sync = sync; - /* - * We do not cleanup fiber keys at the end of each request. - * This does not lead to privilege escalation as long as - * fibers used to serve iproto requests never mingle with - * fibers used to serve background tasks without going - * through the purification of fiber_recycle(), which - * resets the fiber local storage. Fibers, used to run - * background tasks clean up their session in on_stop - * trigger as well. - */ - fiber_set_session(fiber(), session); - fiber_set_user(fiber(), &session->credentials); -} - -/** - * Fire on_disconnect triggers in the tx - * thread and destroy the session object, - * as well as output buffers of the connection. - */ -static void -tx_process_disconnect(struct cmsg *m) -{ - struct iproto_msg *msg = (struct iproto_msg *) m; - struct iproto_connection *con = msg->connection; - if (con->session) { - tx_fiber_init(con->session, 0); - if (! rlist_empty(&session_on_disconnect)) - session_run_on_disconnect_triggers(con->session); - session_destroy(con->session); - con->session = NULL; /* safety */ - } - /* - * Got to be done in iproto thread since - * that's where the memory is allocated. - */ - obuf_destroy(&con->obuf[0]); - obuf_destroy(&con->obuf[1]); -} - -/** - * Cleanup the net thread resources of a connection - * and close the connection. - */ -static void -net_finish_disconnect(struct cmsg *m) -{ - struct iproto_msg *msg = (struct iproto_msg *) m; - /* Runs the trigger, which may yield. */ - iproto_connection_delete(msg->connection); - iproto_msg_delete(msg); -} - -static const struct cmsg_hop disconnect_route[] = { - { tx_process_disconnect, &net_pipe }, - { net_finish_disconnect, NULL }, -}; - -static const struct cmsg_hop misc_route[] = { - { tx_process_misc, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop select_route[] = { - { tx_process_select, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop process1_route[] = { - { tx_process1, &net_pipe }, - { net_send_msg, NULL }, -}; - -static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = { - NULL, /* IPROTO_OK */ - select_route, /* IPROTO_SELECT */ - process1_route, /* IPROTO_INSERT */ - process1_route, /* IPROTO_REPLACE */ - process1_route, /* IPROTO_UPDATE */ - process1_route, /* IPROTO_DELETE */ - misc_route, /* IPROTO_CALL_16 */ - misc_route, /* IPROTO_AUTH */ - misc_route, /* IPROTO_EVAL */ - process1_route, /* IPROTO_UPSERT */ - misc_route /* IPROTO_CALL */ -}; - -static const struct cmsg_hop join_route[] = { - { tx_process_join_subscribe, &net_pipe }, - { net_end_join, NULL }, -}; - -static const struct cmsg_hop subscribe_route[] = { - { tx_process_join_subscribe, &net_pipe }, - { net_end_subscribe, NULL }, -}; - -static struct iproto_connection * -iproto_connection_new(int fd) -{ - struct iproto_connection *con = (struct iproto_connection *) - mempool_alloc_xc(&iproto_connection_pool); - con->input.data = con->output.data = con; - con->loop = loop(); - ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); - ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); - ibuf_create(&con->ibuf[0], cord_slab_cache(), iobuf_readahead); - ibuf_create(&con->ibuf[1], cord_slab_cache(), iobuf_readahead); - obuf_create(&con->obuf[0], &tx_cord->slabc, iobuf_readahead); - obuf_create(&con->obuf[1], &tx_cord->slabc, iobuf_readahead); - con->p_ibuf = &con->ibuf[0]; - con->parse_size = 0; - con->session = NULL; - rlist_create(&con->in_stop_list); - /* It may be very awkward to allocate at close. */ - con->disconnect = iproto_msg_new(con); - cmsg_init(con->disconnect, disconnect_route); - return con; -} - /** * Initiate a connection shutdown. This method may * be invoked many times, and does the internal @@ -678,60 +519,6 @@ iproto_connection_input_buffer(struct iproto_connection *con) return new_ibuf; } -static void -iproto_decode_msg(struct iproto_msg *msg, const char **pos, const char *reqend, - bool *stop_input) -{ - xrow_header_decode_xc(&msg->header, pos, reqend); - assert(*pos == reqend); - uint8_t type = msg->header.type; - - /* - * Parse request before putting it into the queue - * to save tx some CPU. More complicated requests are - * parsed in tx thread into request type-specific objects. - */ - switch (type) { - case IPROTO_SELECT: - case IPROTO_INSERT: - case IPROTO_REPLACE: - case IPROTO_UPDATE: - case IPROTO_DELETE: - case IPROTO_UPSERT: - xrow_decode_dml_xc(&msg->header, &msg->dml, - dml_request_key_map(type)); - assert(type < sizeof(dml_route)/sizeof(*dml_route)); - cmsg_init(msg, dml_route[type]); - break; - case IPROTO_CALL_16: - case IPROTO_CALL: - case IPROTO_EVAL: - xrow_decode_call_xc(&msg->header, &msg->call); - cmsg_init(msg, misc_route); - break; - case IPROTO_PING: - cmsg_init(msg, misc_route); - break; - case IPROTO_JOIN: - cmsg_init(msg, join_route); - *stop_input = true; - break; - case IPROTO_SUBSCRIBE: - cmsg_init(msg, subscribe_route); - *stop_input = true; - break; - case IPROTO_AUTH: - xrow_decode_auth_xc(&msg->header, &msg->auth); - cmsg_init(msg, misc_route); - break; - default: - tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t) type); - break; - } - return; -} - /** Enqueue all requests which were read up. */ static inline void iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) @@ -761,7 +548,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) msg->len = reqend - reqstart; /* total request length */ try { - iproto_decode_msg(msg, &pos, reqend, &stop_input); + iproto_msg_decode(msg, &pos, reqend, &stop_input); /* * This can't throw, but should not be * done in case of exception. @@ -990,6 +777,230 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, } } +static struct iproto_connection * +iproto_connection_new(int fd) +{ + struct iproto_connection *con = (struct iproto_connection *) + mempool_alloc_xc(&iproto_connection_pool); + con->input.data = con->output.data = con; + con->loop = loop(); + ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); + ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); + ibuf_create(&con->ibuf[0], cord_slab_cache(), iobuf_readahead); + ibuf_create(&con->ibuf[1], cord_slab_cache(), iobuf_readahead); + obuf_create(&con->obuf[0], &tx_cord->slabc, iobuf_readahead); + obuf_create(&con->obuf[1], &tx_cord->slabc, iobuf_readahead); + con->p_ibuf = &con->ibuf[0]; + con->parse_size = 0; + con->session = NULL; + rlist_create(&con->in_stop_list); + /* It may be very awkward to allocate at close. */ + con->disconnect = iproto_msg_new(con); + cmsg_init(con->disconnect, disconnect_route); + return con; +} + +/** Recycle a connection. Never throws. */ +static inline void +iproto_connection_delete(struct iproto_connection *con) +{ + assert(iproto_connection_is_idle(con)); + assert(!evio_has_fd(&con->output)); + assert(!evio_has_fd(&con->input)); + assert(con->session == NULL); + /* + * The output buffers must have been deleted + * in tx thread. + */ + ibuf_destroy(&con->ibuf[0]); + ibuf_destroy(&con->ibuf[1]); + assert(con->obuf[0].pos == 0 && + con->obuf[0].iov[0].iov_base == NULL); + assert(con->obuf[1].pos == 0 && + con->obuf[1].iov[0].iov_base == NULL); + if (con->disconnect) + iproto_msg_delete(con->disconnect); + mempool_free(&iproto_connection_pool, con); +} + +/* }}} iproto_connection */ + +/* {{{ iproto_msg - methods and routes */ + +static void +tx_process_misc(struct cmsg *msg); + +static void +tx_process1(struct cmsg *msg); + +static void +tx_process_select(struct cmsg *msg); + +static void +tx_reply_error(struct iproto_msg *msg); + +static void +net_send_msg(struct cmsg *msg); + +static void +tx_process_join_subscribe(struct cmsg *msg); + +static void +net_end_join(struct cmsg *msg); + +static void +net_end_subscribe(struct cmsg *msg); + +static const struct cmsg_hop misc_route[] = { + { tx_process_misc, &net_pipe }, + { net_send_msg, NULL }, +}; + +static const struct cmsg_hop select_route[] = { + { tx_process_select, &net_pipe }, + { net_send_msg, NULL }, +}; + +static const struct cmsg_hop process1_route[] = { + { tx_process1, &net_pipe }, + { net_send_msg, NULL }, +}; + +static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = { + NULL, /* IPROTO_OK */ + select_route, /* IPROTO_SELECT */ + process1_route, /* IPROTO_INSERT */ + process1_route, /* IPROTO_REPLACE */ + process1_route, /* IPROTO_UPDATE */ + process1_route, /* IPROTO_DELETE */ + misc_route, /* IPROTO_CALL_16 */ + misc_route, /* IPROTO_AUTH */ + misc_route, /* IPROTO_EVAL */ + process1_route, /* IPROTO_UPSERT */ + misc_route /* IPROTO_CALL */ +}; + +static const struct cmsg_hop join_route[] = { + { tx_process_join_subscribe, &net_pipe }, + { net_end_join, NULL }, +}; + +static const struct cmsg_hop subscribe_route[] = { + { tx_process_join_subscribe, &net_pipe }, + { net_end_subscribe, NULL }, +}; + +static void +iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, + bool *stop_input) +{ + xrow_header_decode_xc(&msg->header, pos, reqend); + assert(*pos == reqend); + uint8_t type = msg->header.type; + + /* + * Parse request before putting it into the queue + * to save tx some CPU. More complicated requests are + * parsed in tx thread into request type-specific objects. + */ + switch (type) { + case IPROTO_SELECT: + case IPROTO_INSERT: + case IPROTO_REPLACE: + case IPROTO_UPDATE: + case IPROTO_DELETE: + case IPROTO_UPSERT: + xrow_decode_dml_xc(&msg->header, &msg->dml, + dml_request_key_map(type)); + assert(type < sizeof(dml_route)/sizeof(*dml_route)); + cmsg_init(msg, dml_route[type]); + break; + case IPROTO_CALL_16: + case IPROTO_CALL: + case IPROTO_EVAL: + xrow_decode_call_xc(&msg->header, &msg->call); + cmsg_init(msg, misc_route); + break; + case IPROTO_PING: + cmsg_init(msg, misc_route); + break; + case IPROTO_JOIN: + cmsg_init(msg, join_route); + *stop_input = true; + break; + case IPROTO_SUBSCRIBE: + cmsg_init(msg, subscribe_route); + *stop_input = true; + break; + case IPROTO_AUTH: + xrow_decode_auth_xc(&msg->header, &msg->auth); + cmsg_init(msg, misc_route); + break; + default: + tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, + (uint32_t) type); + break; + } + return; +} + +static void +tx_fiber_init(struct session *session, uint64_t sync) +{ + session->sync = sync; + /* + * We do not cleanup fiber keys at the end of each request. + * This does not lead to privilege escalation as long as + * fibers used to serve iproto requests never mingle with + * fibers used to serve background tasks without going + * through the purification of fiber_recycle(), which + * resets the fiber local storage. Fibers, used to run + * background tasks clean up their session in on_stop + * trigger as well. + */ + fiber_set_session(fiber(), session); + fiber_set_user(fiber(), &session->credentials); +} + +/** + * Fire on_disconnect triggers in the tx + * thread and destroy the session object, + * as well as output buffers of the connection. + */ +static void +tx_process_disconnect(struct cmsg *m) +{ + struct iproto_msg *msg = (struct iproto_msg *) m; + struct iproto_connection *con = msg->connection; + if (con->session) { + tx_fiber_init(con->session, 0); + if (! rlist_empty(&session_on_disconnect)) + session_run_on_disconnect_triggers(con->session); + session_destroy(con->session); + con->session = NULL; /* safety */ + } + /* + * Got to be done in iproto thread since + * that's where the memory is allocated. + */ + obuf_destroy(&con->obuf[0]); + obuf_destroy(&con->obuf[1]); +} + +/** + * Cleanup the net thread resources of a connection + * and close the connection. + */ +static void +net_finish_disconnect(struct cmsg *m) +{ + struct iproto_msg *msg = (struct iproto_msg *) m; + /* Runs the trigger, which may yield. */ + iproto_connection_delete(msg->connection); + iproto_msg_delete(msg); +} + + static int tx_check_schema(uint32_t new_schema_version) { -- GitLab