Skip to content
Snippets Groups Projects
Commit c5b4a96b authored by mechanik20051988's avatar mechanik20051988 Committed by Nikita Pettik
Browse files

iproto: rework queue of pending requests in stream.

Currently first element in queue of pending requests in
stream is a request which was pushed to tx thread for
processing. In this patch special pointer to this request
(stream->current) was implemented and queue of pending
requests now really contains only pending requests.
parent 33830978
No related branches found
No related tags found
No related merge requests found
......@@ -79,6 +79,7 @@ enum {
};
struct iproto_connection;
struct iproto_msg;
struct iproto_stream {
/** Currently active stream transaction or NULL */
......@@ -98,6 +99,11 @@ struct iproto_stream {
* transaction and destroy stream object.
*/
struct cmsg on_disconnect;
/**
* Message currently being processed in the tx thread.
* This field is accesable only from iproto thread.
*/
struct iproto_msg *current;
};
/**
......@@ -641,6 +647,7 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
}
rmean_collect(connection->iproto_thread->rmean, IPROTO_STREAMS, 1);
stream->txn = NULL;
stream->current = NULL;
stailq_create(&stream->pending_requests);
stream->id = stream_id;
stream->connection = connection;
......@@ -680,6 +687,7 @@ iproto_msg_delete(struct iproto_msg *msg)
static void
iproto_stream_delete(struct iproto_stream *stream)
{
assert(stream->current == NULL);
assert(stailq_empty(&stream->pending_requests));
assert(stream->txn == NULL);
mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
......@@ -833,16 +841,15 @@ iproto_connection_close(struct iproto_connection *con)
struct iproto_stream *stream = (struct iproto_stream *)
mh_i64ptr_node(con->streams, node)->val;
/**
* If stream requests queue is empty, it means that
* that there is some active transaction which was
* not commited yet. We need to rollback it, since
* we push on_disconnect message to tx thread here.
* If stream requests queue is not empty, it means
* that stream processing some request in tx thread
* now. We destroy stream in `net_send_msg` after
* processing all requests.
* If stream->current == NULL and stream requests
* queue is empty, it means that there is some active
* transaction which was not commited yet. We need to
* rollback it, since we push on_disconnect message
* to tx thread here. Otherwise we destroy stream in
* `net_send_msg` after processing all requests.
*/
if (stailq_empty(&stream->pending_requests))
if (stream->current == NULL &&
stailq_empty(&stream->pending_requests))
iproto_stream_rollback_on_disconnect(stream);
}
cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
......@@ -993,15 +1000,12 @@ iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
assert(stream != NULL);
msg->stream = stream;
/*
* If the request queue in the stream is not empty, it means
* that some previous message wasn't processed yet. Regardless
* of this, we put the message in the queue, but we start processing
* the message only if the message queue in the stream was empty.
*/
bool was_not_empty = !stailq_empty(&stream->pending_requests);
if (stream->current == NULL) {
stream->current = msg;
return 0;
}
stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
return was_not_empty ? 1 : 0;
return 1;
}
/**
......@@ -2268,11 +2272,8 @@ iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
if (stream == NULL)
return;
struct iproto_msg *tmp =
stailq_shift_entry(&stream->pending_requests,
struct iproto_msg, in_stream);
assert(tmp == msg);
(void)tmp;
assert(stream->current == msg);
stream->current = NULL;
if (stailq_empty(&stream->pending_requests)) {
/*
......@@ -2298,13 +2299,14 @@ iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
* If there are new messages for this stream
* then schedule their processing.
*/
struct iproto_msg *next =
stailq_first_entry(&stream->pending_requests,
stream->current =
stailq_shift_entry(&stream->pending_requests,
struct iproto_msg,
in_stream);
assert(next != NULL);
next->wpos = con->wpos;
cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
assert(stream->current != NULL);
stream->current->wpos = con->wpos;
cpipe_push_input(&con->iproto_thread->tx_pipe,
&stream->current->base);
cpipe_flush_input(&con->iproto_thread->tx_pipe);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment