diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 2c7bb9cac0d53f2961f86d26ff545319d81806d1..ef6c62566a00e1fbce2dcff3038cf11b10d7ea0b 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -50,23 +50,23 @@ #include "stat.h" #include "lua/call.h" -/* {{{ iproto_request - declaration */ +/* {{{ iproto_task - declaration */ struct iproto_connection; -typedef void (*iproto_request_f)(struct iproto_request *); +typedef void (*iproto_task_f)(struct iproto_task *); /** * A single request from the client. All requests * from all clients are queued into a single queue * and processed in FIFO order. */ -struct iproto_request +struct iproto_task { struct iproto_connection *connection; struct iobuf *iobuf; struct session *session; - iproto_request_f process; + iproto_task_f process; /* Request message code and sync. */ struct xrow_header header; /* Box request, if this is a DML */ @@ -74,35 +74,35 @@ struct iproto_request size_t total_len; }; -struct mempool iproto_request_pool; +struct mempool iproto_task_pool; -static struct iproto_request * -iproto_request_new(struct iproto_connection *con, - iproto_request_f process); +static struct iproto_task * +iproto_task_new(struct iproto_connection *con, + iproto_task_f process); static void -iproto_process_connect(struct iproto_request *request); +iproto_process_connect(struct iproto_task *task); static void -iproto_process_disconnect(struct iproto_request *request); +iproto_process_disconnect(struct iproto_task *task); static void -iproto_process(struct iproto_request *request); +iproto_process(struct iproto_task *task); struct IprotoRequestGuard { - struct iproto_request *ireq; - IprotoRequestGuard(struct iproto_request *ireq_arg):ireq(ireq_arg) {} + struct iproto_task *ireq; + IprotoRequestGuard(struct iproto_task *ireq_arg):ireq(ireq_arg) {} ~IprotoRequestGuard() - { if (ireq) mempool_free(&iproto_request_pool, ireq); } - struct iproto_request *release() - { struct iproto_request *tmp = ireq; ireq = NULL; return tmp; } + { if (ireq) mempool_free(&iproto_task_pool, ireq); } + struct iproto_task *release() + { struct iproto_task *tmp = ireq; ireq = NULL; return tmp; } }; /* }}} */ /* {{{ iproto_queue */ -struct iproto_request; +struct iproto_task; enum { IPROTO_REQUEST_QUEUE_SIZE = 2048, }; @@ -120,14 +120,14 @@ enum { IPROTO_REQUEST_QUEUE_SIZE = 2048, }; struct iproto_queue { /** Ring buffer of fixed size */ - struct iproto_request *queue[IPROTO_REQUEST_QUEUE_SIZE]; + struct iproto_task *queue[IPROTO_REQUEST_QUEUE_SIZE]; /** - * Cache of fibers which work on requests + * Cache of fibers which work on tasks * in this queue. */ struct rlist fiber_cache; /** - * Used to trigger request processing when + * Used to trigger task processing when * the queue becomes non-empty. */ struct ev_async watcher; @@ -144,8 +144,8 @@ iproto_queue_is_empty(struct iproto_queue *i_queue) } /** - * A single global queue for all requests in all connections. All - * requests are processed concurrently. + * A single global queue for all tasks in all connections. All + * tasks are processed concurrently. * Is also used as a queue for just established connections and to * execute disconnect triggers. A few notes about these triggers: * - they need to be run in a fiber @@ -154,34 +154,34 @@ iproto_queue_is_empty(struct iproto_queue *i_queue) * - on_connect trigger must be processed before any other * request on this connection. */ -static struct iproto_queue request_queue; +static struct iproto_queue tx_queue; static void iproto_queue_push(struct iproto_queue *i_queue, - struct iproto_request *request) + struct iproto_task *task) { /* If the queue is full, invoke the handler to work it off. */ if (i_queue->end == i_queue->size) ev_invoke(loop(), &i_queue->watcher, EV_CUSTOM); assert(i_queue->end < i_queue->size); /* - * There were some queued requests, ensure they are + * There were some queued tasks, ensure they are * handled. */ if (iproto_queue_is_empty(i_queue)) - ev_feed_event(loop(), &request_queue.watcher, EV_CUSTOM); - i_queue->queue[i_queue->end++] = request; + ev_feed_event(loop(), &tx_queue.watcher, EV_CUSTOM); + i_queue->queue[i_queue->end++] = task; } -static struct iproto_request * +static struct iproto_task * iproto_queue_pop(struct iproto_queue *i_queue) { if (i_queue->begin == i_queue->end) return NULL; - struct iproto_request *request = i_queue->queue[i_queue->begin++]; + struct iproto_task *task = i_queue->queue[i_queue->begin++]; if (i_queue->begin == i_queue->end) i_queue->begin = i_queue->end = 0; - return request; + return task; } /** @@ -192,13 +192,13 @@ static void iproto_queue_handler(va_list ap) { struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *); - struct iproto_request *request; + struct iproto_task *task; restart: - while ((request = iproto_queue_pop(i_queue))) { - IprotoRequestGuard guard(request); - fiber_set_session(fiber(), request->session); - fiber_set_user(fiber(), &request->session->credentials); - request->process(request); + while ((task = iproto_queue_pop(i_queue))) { + IprotoRequestGuard guard(task); + fiber_set_session(fiber(), task->session); + fiber_set_user(fiber(), &task->session->credentials); + task->process(task); } /** Put the current fiber into a queue fiber cache. */ rlist_add_entry(&i_queue->fiber_cache, fiber(), state); @@ -255,10 +255,10 @@ struct iproto_connection struct iobuf *iobuf[2]; /* * Size of readahead which is not parsed yet, i.e. - * size of a piece of request which is not fully read. + * size of a piece of task which is not fully read. * Is always relative to iobuf[0]->in.end. In other words, * iobuf[0]->in.end - parse_size gives the start of the - * unparsed request. A size rather than a pointer is used + * unparsed task. A size rather than a pointer is used * to be safe in case in->buf is reallocated. Being * relative to in->end, rather than to in->pos is helpful to * make sure ibuf_reserve() or iobuf rotation don't make @@ -268,8 +268,8 @@ struct iproto_connection /** Current write position in the output buffer */ struct obuf_svp write_pos; /** - * Function of the request processor to handle - * a single request. + * Function of the task processor to handle + * a single task. */ struct ev_io input; struct ev_io output; @@ -277,15 +277,15 @@ struct iproto_connection struct session *session; uint64_t cookie; ev_loop *loop; - /* Pre-allocated disconnect request. */ - struct iproto_request *disconnect; + /* Pre-allocated disconnect task. */ + struct iproto_task *disconnect; }; static struct mempool iproto_connection_pool; /** * A connection is idle when the client is gone - * and there are no outstanding requests in the request queue. + * and there are no outstanding tasks in the task queue. * An idle connection can be safely garbage collected. * Note: a connection only becomes idle after iproto_connection_close(), * which closes the fd. This is why here the check is for @@ -323,7 +323,7 @@ iproto_connection_new(const char *name, int fd, struct sockaddr *addr) con->session = NULL; con->cookie = *(uint64_t *) addr; /* It may be very awkward to allocate at close. */ - con->disconnect = iproto_request_new(con, iproto_process_disconnect); + con->disconnect = iproto_task_new(con, iproto_process_disconnect); return con; } @@ -341,7 +341,7 @@ iproto_connection_delete(struct iproto_connection *con) iobuf_delete(con->iobuf[0]); iobuf_delete(con->iobuf[1]); if (con->disconnect) - mempool_free(&iproto_request_pool, con->disconnect); + mempool_free(&iproto_task_pool, con->disconnect); mempool_free(&iproto_connection_pool, con); } @@ -358,16 +358,16 @@ iproto_connection_shutdown(struct iproto_connection *con) con->iobuf[0]->in.end -= con->parse_size; /* * If the con is not idle, it is destroyed - * after the last request is handled. Otherwise, - * queue a separate request to run on_disconnect() + * after the last task is handled. Otherwise, + * queue a separate task to run on_disconnect() * trigger and destroy the connection. * Sic: the check is mandatory to not destroy a connection * twice. */ if (iproto_connection_is_idle(con)) { - struct iproto_request *ireq = con->disconnect; + struct iproto_task *ireq = con->disconnect; con->disconnect = NULL; - iproto_queue_push(&request_queue, ireq); + iproto_queue_push(&tx_queue, ireq); } } @@ -470,8 +470,8 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) const char *reqend = pos + len; if (reqend > in->end) break; - struct iproto_request *ireq = - iproto_request_new(con, iproto_process); + struct iproto_task *ireq = + iproto_task_new(con, iproto_process); IprotoRequestGuard guard(ireq); xrow_header_decode(&ireq->header, &pos, reqend); @@ -495,7 +495,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) ireq->header.body[0].iov_len); } ireq->request.header = &ireq->header; - iproto_queue_push(&request_queue, guard.release()); + iproto_queue_push(&tx_queue, guard.release()); /* Request will be discarded in iproto_process_XXX */ /* Request is parsed */ @@ -630,7 +630,7 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, /* {{{ iproto_process_* functions */ static void -iproto_process(struct iproto_request *ireq) +iproto_process(struct iproto_task *ireq) { struct iobuf *iobuf = ireq->iobuf; struct obuf *out = &iobuf->out; @@ -721,12 +721,12 @@ iproto_process(struct iproto_request *ireq) } } -static struct iproto_request * -iproto_request_new(struct iproto_connection *con, - iproto_request_f process) +static struct iproto_task * +iproto_task_new(struct iproto_connection *con, + iproto_task_f process) { - struct iproto_request *ireq = - (struct iproto_request *) mempool_alloc(&iproto_request_pool); + struct iproto_task *ireq = + (struct iproto_task *) mempool_alloc(&iproto_task_pool); ireq->connection = con; ireq->iobuf = con->iobuf[0]; ireq->session = con->session; @@ -753,7 +753,7 @@ iproto_greeting(const char *salt) * upon a failure. */ static void -iproto_process_connect(struct iproto_request *request) +iproto_process_connect(struct iproto_task *request) { struct iproto_connection *con = request->connection; struct iobuf *iobuf = request->iobuf; @@ -789,7 +789,7 @@ iproto_process_connect(struct iproto_request *request) } static void -iproto_process_disconnect(struct iproto_request *request) +iproto_process_disconnect(struct iproto_task *request) { /* Runs the trigger, which may yield. */ iproto_connection_delete(request->connection); @@ -812,22 +812,22 @@ iproto_on_accept(struct evio_service * /* service */, int fd, con = iproto_connection_new(name, fd, addr); /* - * Ignore request allocation failure - the queue size is - * fixed so there is a limited number of requests in + * Ignore task allocation failure - the queue size is + * fixed so there is a limited number of tasks in * use, all stored in just a few blocks of the memory pool. */ - struct iproto_request *ireq = - iproto_request_new(con, iproto_process_connect); - iproto_queue_push(&request_queue, ireq); + struct iproto_task *ireq = + iproto_task_new(con, iproto_process_connect); + iproto_queue_push(&tx_queue, ireq); } /** Initialize a read-write port. */ void iproto_init(struct evio_service *service) { - mempool_create(&iproto_request_pool, &cord()->slabc, - sizeof(struct iproto_request)); - iproto_queue_init(&request_queue); + mempool_create(&iproto_task_pool, &cord()->slabc, + sizeof(struct iproto_task)); + iproto_queue_init(&tx_queue); mempool_create(&iproto_connection_pool, &cord()->slabc, sizeof(struct iproto_connection));