diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 7ef6c631cb153a2bbda74838874344c006737da1..d963dfbbc1fa12826ce2145ceb53a29c8df7993f 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -577,6 +577,11 @@ struct iproto_connection struct ibuf ibuf[2]; /** Pointer to the current buffer. */ struct ibuf *p_ibuf; + /** + * Number of not yet processed messages in the corresponding + * input buffer. + */ + size_t input_msg_count[2]; /** * Two rotating buffers for output. The tx thread switches to * another buffer if it finds it to be empty (flushed out). @@ -950,6 +955,7 @@ iproto_connection_close(struct iproto_connection *con) * is done only once. */ ibuf_discard(con->p_ibuf, con->parse_size); + con->parse_size = 0; mh_int_t node; mh_foreach(con->streams, node) { struct iproto_stream *stream = (struct iproto_stream *) @@ -1184,8 +1190,8 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) msg->p_ibuf = con->p_ibuf; msg->reqstart = reqstart; msg->wpos = con->wpos; - msg->len = reqend - reqstart; /* total request length */ + con->input_msg_count[msg->p_ibuf == &con->ibuf[1]]++; iproto_msg_prepare(msg, &pos, reqend, &stop_input); @@ -1478,6 +1484,8 @@ iproto_connection_new(struct iproto_thread *iproto_thread) ev_io_init(&con->output, iproto_connection_on_output, -1, EV_NONE); ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); + con->input_msg_count[0] = 0; + con->input_msg_count[1] = 0; obuf_create(&con->obuf[0], &con->iproto_thread->net_slabc, iproto_readahead); obuf_create(&con->obuf[1], &con->iproto_thread->net_slabc, @@ -1862,6 +1870,28 @@ net_finish_destroy(struct cmsg *m) iproto_connection_delete(con); } +/** Account msg data in connection input buffer as processed. */ +static void +iproto_msg_finish_input(iproto_msg *msg) +{ + struct iproto_connection *con = msg->connection; + struct ibuf *ibuf = msg->p_ibuf; + size_t *count = &con->input_msg_count[msg->p_ibuf == &con->ibuf[1]]; + /* + * Consume data from input buffer only when data of all messages + * is processed because messages process order and order of messages + * in the buffer may differ. + */ + assert(*count != 0); + if (--(*count) == 0) { + size_t processed = ibuf_used(ibuf); + if (ibuf == con->p_ibuf) { + assert(processed >= con->parse_size); + processed -= con->parse_size; + } + ibuf_consume(ibuf, processed); + } +} static void net_discard_input(struct cmsg *m) @@ -1869,7 +1899,7 @@ net_discard_input(struct cmsg *m) struct iproto_msg *msg = container_of(m, struct iproto_msg, discard_input); struct iproto_connection *con = msg->connection; - msg->p_ibuf->rpos += msg->len; + iproto_msg_finish_input(msg); msg->len = 0; con->long_poll_count++; if (con->state == IPROTO_CONNECTION_ALIVE) @@ -2703,7 +2733,7 @@ net_send_msg(struct cmsg *m) iproto_msg_finish_processing_in_stream(msg); if (msg->len != 0) { /* Discard request (see iproto_enqueue_batch()). */ - msg->p_ibuf->rpos += msg->len; + iproto_msg_finish_input(msg); } else { /* Already discarded by net_discard_input(). */ assert(con->long_poll_count > 0); @@ -2739,7 +2769,7 @@ net_end_join(struct cmsg *m) struct iproto_connection *con = msg->connection; struct ibuf *ibuf = msg->p_ibuf; - ibuf->rpos += msg->len; + iproto_msg_finish_input(msg); iproto_msg_delete(msg); assert(! ev_is_active(&con->input)); @@ -2757,7 +2787,7 @@ net_end_subscribe(struct cmsg *m) struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; - msg->p_ibuf->rpos += msg->len; + iproto_msg_finish_input(msg); iproto_msg_delete(msg); assert(! ev_is_active(&con->input));