diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0353efdb01e91a1a8d9c00bfb1df8006ec74c7c9..d0e39a5469d203ed1fc2285279927e7742cf0d29 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -44,6 +44,7 @@ set (core_sources say.cc memory.cc fiber.cc + cbus.cc exception.cc coro.cc object.cc diff --git a/src/box/errcode.h b/src/box/errcode.h index 24226031a1103e2ee76ffb38c55af1f75adb15cc..cc09e1a9007265ce775341bbb463c752815d3a2f 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -58,7 +58,7 @@ struct errcode_record { /* 4 */_(ER_TUPLE_NOT_FOUND, 2, "Tuple doesn't exist in index '%s' in space '%s'") \ /* 5 */_(ER_UNSUPPORTED, 2, "%s does not support %s") \ /* 6 */_(ER_NONMASTER, 2, "Can't modify data on a replication slave. My master is: %s") \ - /* 7 */_(ER_READONLY, 2, "Can't modify data because this server in read-only mode.") \ + /* 7 */_(ER_READONLY, 2, "Can't modify data because this server is in read-only mode.") \ /* 8 */_(ER_INJECTION, 2, "Error injection '%s'") \ /* 9 */_(ER_CREATE_SPACE, 2, "Failed to create space '%s': %s") \ /* 10 */_(ER_SPACE_EXISTS, 2, "Space '%s' already exists") \ diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 75042a8f8cbfac3def0e61eed4b94d802ad917f7..f60fca845bb6df229027b49e296ef287fdee529a 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -35,6 +35,7 @@ #include "iproto_port.h" #include "main.h" #include "fiber.h" +#include "cbus.h" #include "say.h" #include "evio.h" #include "scoped_guard.h" @@ -50,24 +51,18 @@ #include "stat.h" #include "lua/call.h" -/* {{{ iproto_task - declaration */ - -typedef void (*iproto_task_f)(struct iproto_task *); +/* {{{ iproto_msg - declaration */ /** - * A single task from io thread. All requests + * A single msg from io thread. All requests * from all connections are queued into a single queue * and processed in FIFO order. */ -struct iproto_task +struct iproto_msg: public cmsg { - STAILQ_ENTRY(iproto_task) fifo; - /** Function to handle a single task. */ - iproto_task_f process; - /** --- Members common to all tasks --- */ struct iproto_connection *connection; - /* --- Box tasks - actual requests for the transaction processor --- */ + /* --- Box msgs - actual requests for the transaction processor --- */ /* Request message code and sync. */ struct xrow_header header; /* Box request, if this is a DML */ @@ -81,121 +76,51 @@ struct iproto_task /** * How much space the request takes in the * input buffer (len, header and body - all of it) + * This also works as a reference counter to + * iproto_connection object. */ size_t len; -}; - -STAILQ_HEAD(iproto_fifo, iproto_task); - -static struct mempool iproto_task_pool; - -static struct iproto_task * -iproto_task_new(struct iproto_connection *con, - iproto_task_f process) -{ - struct iproto_task *task = - (struct iproto_task *) mempool_alloc(&iproto_task_pool); - task->connection = con; - task->process = process; - return task; -} - -struct IprotoTaskGuard { - struct iproto_task *task; - IprotoTaskGuard(struct iproto_task *task_arg):task(task_arg) {} - ~IprotoTaskGuard() - { if (task) mempool_free(&iproto_task_pool, task); } - struct iproto_task *release() - { struct iproto_task *tmp = task; task = NULL; return tmp; } -}; - -/* }}} */ - -/* {{{ iproto_queue */ - -enum { - IPROTO_FIBER_POOL_SIZE = 1024, - IPROTO_FIBER_CACHE_SIZE = IPROTO_FIBER_POOL_SIZE * 2 -}; - -/** - * Implementation of an input queue of the box request processor. - * - * Event handlers read data, determine request boundaries - * and enqueue requests. Once all input/output events are - * processed, an own handler is invoked to deal with the - * requests in the queue. It leases a fiber from a pool - * and runs the request in the fiber. - * - * @sa iproto_queue_schedule - */ -struct iproto_queue -{ - /** Ring buffer of fixed size */ - struct iproto_fifo queue; - /** Cache of fibers which work on tasks in this queue. */ - struct rlist fiber_cache; - /** The number of fibers in the cache */ - int cache_size; - /** The number of fibers working on tasks. */ - int pool_size; + /** End of write position in the output buffer */ + struct obuf_svp write_end; /** - * Used to trigger task processing when - * the queue becomes non-empty. + * Used in "connect" msgs, true if connect trigger failed + * and the connection must be closed. */ - struct ev_async watcher; + bool close_connection; }; -static inline bool -iproto_queue_is_empty(struct iproto_queue *i_queue) -{ - return STAILQ_EMPTY(&i_queue->queue); -} +static struct mempool iproto_msg_pool; -static inline bool -iproto_queue_needs_throttling(struct iproto_queue *i_queue) +static struct iproto_msg * +iproto_msg_new(struct iproto_connection *con, struct cmsg_hop *route) { - return i_queue->pool_size > IPROTO_FIBER_POOL_SIZE; + struct iproto_msg *msg = + (struct iproto_msg *) mempool_alloc(&iproto_msg_pool); + cmsg_init(msg, route); + msg->connection = con; + return msg; } -static void -iproto_queue_push(struct iproto_queue *i_queue, struct iproto_task *task) +static inline void +iproto_msg_delete(struct cmsg *msg) { - /* Trigger task processing when the queue becomes non-empty. */ - if (iproto_queue_is_empty(i_queue)) - ev_feed_event(loop(), &i_queue->watcher, EV_CUSTOM); - STAILQ_INSERT_TAIL(&i_queue->queue, task, fifo); + mempool_free(&iproto_msg_pool, msg); } -static struct iproto_task * -iproto_queue_pop(struct iproto_queue *i_queue) -{ - if (iproto_queue_is_empty(i_queue)) - return NULL; - struct iproto_task *task = STAILQ_FIRST(&i_queue->queue); - STAILQ_REMOVE_HEAD(&i_queue->queue, fifo); - return task; -} +struct IprotoMsgGuard { + struct iproto_msg *msg; + IprotoMsgGuard(struct iproto_msg *msg_arg):msg(msg_arg) {} + ~IprotoMsgGuard() + { if (msg) iproto_msg_delete(msg); } + struct iproto_msg *release() + { struct iproto_msg *tmp = msg; msg = NULL; return tmp; } +}; -static inline void -iproto_queue_init(struct iproto_queue *i_queue, ev_async_cb cb) -{ - STAILQ_INIT(&i_queue->queue); - /** - * Initialize an ev_async event which would start - * workers for all outstanding tasks. - */ - ev_async_init(&i_queue->watcher, cb); - i_queue->watcher.data = i_queue; - rlist_create(&i_queue->fiber_cache); - i_queue->cache_size = 0; - i_queue->pool_size = 0; -} +enum { IPROTO_FIBER_POOL_SIZE = 1024 }; /* }}} */ - -/* {{{ iproto_connection */ +/* {{{ iproto connection and requests */ /** * A single global queue for all requests in all connections. All @@ -208,7 +133,11 @@ iproto_queue_init(struct iproto_queue *i_queue, ev_async_cb cb) * - on_connect trigger must be processed before any other * request on this connection. */ -static struct iproto_queue tx_queue; +static struct cpipe tx_pipe; +static struct cpipe net_pipe; +static struct cbus net_tx_bus; +/* A pointer to the transaction processor cord. */ +struct cord *tx_cord; /** Context of a single client connection. */ struct iproto_connection @@ -233,34 +162,37 @@ struct iproto_connection * the value meaningless. */ ssize_t parse_size; - /** Current write position in the output buffer */ - struct obuf_svp write_pos; struct ev_io input; struct ev_io output; /** Logical session. */ struct session *session; uint64_t cookie; ev_loop *loop; - /* Pre-allocated disconnect task. */ - struct iproto_task *disconnect; + /* Pre-allocated disconnect msg. */ + struct iproto_msg *disconnect; }; static struct mempool iproto_connection_pool; /** * A connection is idle when the client is gone - * and there are no outstanding tasks in the task queue. + * and there are no outstanding msgs in the msg queue. * An idle connection can be safely garbage collected. * Note: a connection only becomes idle after iproto_connection_close(), * which closes the fd. This is why here the check is for - * evio_is_active() (false if fd is closed), not ev_is_active() - * (false if event is not started). + * evio_has_fd(), not ev_is_active() (false if event is not + * started). + * + * ibuf_size() provides an effective reference counter + * on connection use in the tx request queue. Any request + * in the request queue has a non-zero len, and ibuf_size() + * is therefore non-zero as long as there is at least + * one request in the tx queue. */ static inline bool iproto_connection_is_idle(struct iproto_connection *con) { - return !evio_is_active(&con->input) && - ibuf_used(&con->iobuf[0]->in) == 0 && + return ibuf_used(&con->iobuf[0]->in) == 0 && ibuf_used(&con->iobuf[1]->in) == 0; } @@ -276,29 +208,74 @@ static inline void iproto_connection_delete(struct iproto_connection *con) { assert(iproto_connection_is_idle(con)); - assert(!evio_is_active(&con->output)); + assert(!evio_has_fd(&con->output)); + assert(!evio_has_fd(&con->input)); + assert(con->session == NULL); + /* + * The output buffers must have been deleted + * in tx thread. + */ + iobuf_delete_mt(con->iobuf[0]); + iobuf_delete_mt(con->iobuf[1]); + if (con->disconnect) + iproto_msg_delete(con->disconnect); + mempool_free(&iproto_connection_pool, con); +} + +static void +tx_process_msg(struct cmsg *msg); + +static void +net_send_msg(struct cmsg *msg); + +/** + * Fire on_disconnect triggers in the tx + * thread and destroy the session object, + * as well as output buffers of the connection. + */ +static void +tx_process_disconnect(struct cmsg *m) +{ + struct iproto_msg *msg = (struct iproto_msg *) m; + struct iproto_connection *con = msg->connection; if (con->session) { if (! rlist_empty(&session_on_disconnect)) session_run_on_disconnect_triggers(con->session); session_destroy(con->session); + con->session = NULL; /* safety */ } - iobuf_delete(con->iobuf[0]); - iobuf_delete(con->iobuf[1]); - if (con->disconnect) - mempool_free(&iproto_task_pool, con->disconnect); - mempool_free(&iproto_connection_pool, con); + /* + * Got to be done in iproto thread since + * that's where the memory is allocated. + */ + obuf_destroy(&con->iobuf[0]->out); + obuf_destroy(&con->iobuf[1]->out); } +/** + * Cleanup the net thread resources of a connection + * and close the connection. + */ static void -iproto_process(struct iproto_task *task); - -static void -iproto_process_disconnect(struct iproto_task *task) +net_finish_disconnect(struct cmsg *m) { + struct iproto_msg *msg = (struct iproto_msg *) m; /* Runs the trigger, which may yield. */ - iproto_connection_delete(task->connection); + iproto_connection_delete(msg->connection); + iproto_msg_delete(msg); } +static struct cmsg_hop disconnect_route[] = { + { tx_process_disconnect, &net_pipe }, + { net_finish_disconnect, NULL }, +}; + + +static struct cmsg_hop request_route[] = { + { tx_process_msg, &net_pipe }, + { net_send_msg, NULL }, +}; + static struct iproto_connection * iproto_connection_new(const char *name, int fd, struct sockaddr *addr) { @@ -309,14 +286,13 @@ iproto_connection_new(const char *name, int fd, struct sockaddr *addr) con->loop = loop(); ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); - con->iobuf[0] = iobuf_new(); - con->iobuf[1] = iobuf_new(); + con->iobuf[0] = iobuf_new_mt(&tx_cord->slabc); + con->iobuf[1] = iobuf_new_mt(&tx_cord->slabc); con->parse_size = 0; - con->write_pos = obuf_create_svp(&con->iobuf[0]->out); con->session = NULL; con->cookie = *(uint64_t *) addr; /* It may be very awkward to allocate at close. */ - con->disconnect = iproto_task_new(con, iproto_process_disconnect); + con->disconnect = iproto_msg_new(con, disconnect_route); return con; } @@ -335,15 +311,15 @@ iproto_connection_close(struct iproto_connection *con) /* * If the con is not idle, it is destroyed * after the last request is handled. Otherwise, - * queue a separate task to run on_disconnect() + * queue a separate msg to run on_disconnect() * trigger and destroy the connection. * Sic: the check is mandatory to not destroy a connection * twice. */ if (iproto_connection_is_idle(con)) { - struct iproto_task *task = con->disconnect; + struct iproto_msg *msg = con->disconnect; con->disconnect = NULL; - iproto_queue_push(&tx_queue, task); + cpipe_push(&tx_pipe, msg); } close(fd); } @@ -425,6 +401,7 @@ iproto_connection_input_iobuf(struct iproto_connection *con) static inline void iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) { + bool stop_input = false; while (true) { const char *reqstart = in->wpos - con->parse_size; const char *pos = reqstart; @@ -439,41 +416,54 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) const char *reqend = pos + len; if (reqend > in->wpos) break; - struct iproto_task *task = - iproto_task_new(con, iproto_process); - task->iobuf = con->iobuf[0]; - IprotoTaskGuard guard(task); + struct iproto_msg *msg = iproto_msg_new(con, request_route); + msg->iobuf = con->iobuf[0]; + IprotoMsgGuard guard(msg); - xrow_header_decode(&task->header, &pos, reqend); + xrow_header_decode(&msg->header, &pos, reqend); assert(pos == reqend); - task->len = reqend - reqstart; /* total request length */ - + msg->len = reqend - reqstart; /* total request length */ /* * sic: in case of exception con->parse_size - * as well as in->rpos must not be advanced, to - * stay in sync. + * must not be advanced to stay in sync with + * in->rpos. */ - if (task->header.type >= IPROTO_SELECT && - task->header.type <= IPROTO_EVAL) { + if (msg->header.type >= IPROTO_SELECT && + msg->header.type <= IPROTO_EVAL) { /* Pre-parse request before putting it into the queue */ - if (task->header.bodycnt == 0) { + if (msg->header.bodycnt == 0) { tnt_raise(ClientError, ER_INVALID_MSGPACK, "request type"); } - request_create(&task->request, task->header.type); - pos = (const char *) task->header.body[0].iov_base; - request_decode(&task->request, pos, - task->header.body[0].iov_len); + request_create(&msg->request, msg->header.type); + pos = (const char *) msg->header.body[0].iov_base; + request_decode(&msg->request, pos, + msg->header.body[0].iov_len); + } else if (msg->header.type == IPROTO_SUBSCRIBE || + msg->header.type == IPROTO_JOIN) { + /** + * Don't mess with the file descriptor + * while join is running. + */ + ev_io_stop(con->loop, &con->output); + ev_io_stop(con->loop, &con->input); + stop_input = true; } - task->request.header = &task->header; - iproto_queue_push(&tx_queue, guard.release()); - /* Request will be discarded in iproto_process_XXX */ + msg->request.header = &msg->header; + cpipe_push_input(&tx_pipe, guard.release()); /* Request is parsed */ con->parse_size -= reqend - reqstart; - if (con->parse_size == 0) + if (con->parse_size == 0 || stop_input) break; } + cpipe_flush_input(&tx_pipe); + /* + * Keep reading input, as long as the socket + * supplies data. + */ + if (!stop_input && !ev_is_active(&con->input)) + ev_feed_event(con->loop, &con->input, EV_READ); } static void @@ -509,12 +499,6 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, con->parse_size += nrd; /* Enqueue all requests which are fully read up. */ iproto_enqueue_batch(con, in); - /* - * Keep reading input, as long as the socket - * supplies data. - */ - if (!ev_is_active(&con->input)) - ev_feed_event(loop, &con->input, EV_READ); } catch (Exception *e) { e->log(); iproto_connection_close(con); @@ -525,8 +509,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, static inline struct iobuf * iproto_connection_output_iobuf(struct iproto_connection *con) { - if (obuf_used(&con->iobuf[1]->out) && - obuf_used(&con->iobuf[1]->out) > con->write_pos.used) + if (obuf_used(&con->iobuf[1]->out) > 0) return con->iobuf[1]; /* * Don't try to write from a newer buffer if an older one @@ -535,39 +518,45 @@ iproto_connection_output_iobuf(struct iproto_connection *con) * pieces of replies from both buffers. */ if (ibuf_used(&con->iobuf[1]->in) == 0 && - obuf_used(&con->iobuf[0]->out) && - obuf_used(&con->iobuf[0]->out) > con->write_pos.used) + obuf_used(&con->iobuf[0]->out) > 0) return con->iobuf[0]; return NULL; } -/** writev() to the socket and handle the output. */ +/** writev() to the socket and handle the result. */ + static int -iproto_flush(struct iobuf *iobuf, int fd, struct obuf_svp *svp) +iproto_flush(struct iobuf *iobuf, struct iproto_connection *con) { - /* Begin writing from the saved position. */ - struct iovec *iov = iobuf->out.iov + svp->pos; - int iovcnt = obuf_iovcnt(&iobuf->out) - svp->pos; - assert(iovcnt); - ssize_t nwr; - try { - sio_add_to_iov(iov, -svp->iov_len); - nwr = sio_writev(fd, iov, iovcnt); - - sio_add_to_iov(iov, svp->iov_len); - } catch (SocketError *) { - sio_add_to_iov(iov, svp->iov_len); - throw; - } + int fd = con->output.fd; + struct obuf_svp *begin = &iobuf->out.wpos; + struct obuf_svp *end = &iobuf->out.wend; + assert(begin->used < end->used); + struct iovec iov[SMALL_OBUF_IOV_MAX+1]; + struct iovec *src = iobuf->out.iov; + int iovcnt = end->pos - begin->pos + 1; + /* + * iov[i].iov_len may be concurrently modified in tx thread, + * but only for the last position. + */ + memcpy(iov, src + begin->pos, iovcnt * sizeof(struct iovec)); + sio_add_to_iov(iov, -begin->iov_len); + /* *Overwrite* iov_len of the last pos as it may be garbage. */ + iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1); + ssize_t nwr = sio_writev(fd, iov, iovcnt); if (nwr > 0) { - if (svp->used + nwr == obuf_used(&iobuf->out)) { - iobuf_reset(iobuf); - *svp = obuf_create_svp(&iobuf->out); + if (begin->used + nwr == end->used) { + /* Quickly recycle the buffer if it's idle. */ + if (ibuf_used(&iobuf->in) == 0) { + assert(end->used == obuf_size(&iobuf->out)); + iobuf_reset(iobuf); + } + *begin = *end; /* advance write position */ return 0; } - svp->used += nwr; - svp->pos += sio_move_iov(iov, nwr, &svp->iov_len); + begin->used += nwr; /* advance write position */ + begin->pos += sio_move_iov(iov, nwr, &begin->iov_len); } return -1; } @@ -577,13 +566,11 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, int /* revents */) { struct iproto_connection *con = (struct iproto_connection *) watcher->data; - int fd = con->output.fd; - struct obuf_svp *svp = &con->write_pos; try { struct iobuf *iobuf; while ((iobuf = iproto_connection_output_iobuf(con))) { - if (iproto_flush(iobuf, fd, svp) < 0) { + if (iproto_flush(iobuf, con) < 0) { ev_io_start(loop, &con->output); return; } @@ -591,113 +578,36 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, ev_feed_event(loop, &con->input, EV_READ); } if (ev_is_active(&con->output)) - ev_io_stop(loop, &con->output); + ev_io_stop(con->loop, &con->output); } catch (Exception *e) { e->log(); iproto_connection_close(con); } } -/* }}} */ - -/** {{{ tx_queue handlers */ -/** - * Main function of the fiber invoked to handle all outstanding - * tasks in a queue. - */ -static void -iproto_tx_queue_fiber(va_list ap) -{ - struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *); - struct iproto_task *task; - i_queue->pool_size++; - auto size_guard = make_scoped_guard([=]{ i_queue->pool_size--; }); -restart: - while ((task = iproto_queue_pop(i_queue))) { - IprotoTaskGuard guard(task); - struct session *session = task->connection->session; - fiber_set_session(fiber(), session); - fiber_set_user(fiber(), &session->credentials); - task->process(task); - } - if (i_queue->cache_size < IPROTO_FIBER_CACHE_SIZE) { - /** Put the current fiber into a queue fiber cache. */ - rlist_add_entry(&i_queue->fiber_cache, fiber(), state); - i_queue->pool_size--; - i_queue->cache_size++; - fiber_yield(); - i_queue->cache_size--; - i_queue->pool_size++; - goto restart; - } -} - -/** Create fibers to handle all outstanding tasks. */ -static void -iproto_tx_queue_cb(ev_loop * /* loop */, struct ev_async *watcher, - int /* events */) -{ - struct iproto_queue *i_queue = (struct iproto_queue *) watcher->data; - while (! iproto_queue_is_empty(i_queue)) { - struct fiber *f; - if (! rlist_empty(&i_queue->fiber_cache)) { - f = rlist_shift_entry(&i_queue->fiber_cache, - struct fiber, state); - } else if (! iproto_queue_needs_throttling(i_queue)) { - f = fiber_new("iproto", iproto_tx_queue_fiber); - } else { - /** - * No worries that this watcher may not - * get scheduled again - there are enough - * worker fibers already, so just leave. - */ - break; - } - fiber_start(f, i_queue); - } -} - -/* }}} */ - -/* {{{ iproto_process_* functions */ - static void -iproto_process(struct iproto_task *task) +tx_process_msg(struct cmsg *m) { - struct iobuf *iobuf = task->iobuf; - struct obuf *out = &iobuf->out; - struct iproto_connection *con = task->connection; - - auto scope_guard = make_scoped_guard([=]{ - /* Discard request (see iproto_enqueue_batch()) */ - iobuf->in.rpos += task->len; - - if (evio_is_active(&con->output)) { - if (! ev_is_active(&con->output)) - ev_feed_event(con->loop, - &con->output, - EV_WRITE); - } else if (iproto_connection_is_idle(con)) { - iproto_connection_delete(con); - } - }); - - if (unlikely(! evio_is_active(&con->output))) - return; - - con->session->sync = task->header.sync; + struct iproto_msg *msg = (struct iproto_msg *) m; + struct obuf *out = &msg->iobuf->out; + struct iproto_connection *con = msg->connection; + struct session *session = msg->connection->session; + fiber_set_session(fiber(), session); + fiber_set_user(fiber(), &session->credentials); + + session->sync = msg->header.sync; try { - switch (task->header.type) { + switch (msg->header.type) { case IPROTO_SELECT: case IPROTO_INSERT: case IPROTO_REPLACE: case IPROTO_UPDATE: case IPROTO_DELETE: - assert(task->request.type == task->header.type); + assert(msg->request.type == msg->header.type); struct iproto_port port; - iproto_port_init(&port, out, task->header.sync); + iproto_port_init(&port, out, msg->header.sync); try { - box_process(&task->request, (struct port *) &port); + box_process(&msg->request, (struct port *) &port); } catch (Exception *e) { /* * This only works if there are no @@ -712,27 +622,27 @@ iproto_process(struct iproto_task *task) } break; case IPROTO_CALL: - assert(task->request.type == task->header.type); - stat_collect(stat_base, task->request.type, 1); - box_lua_call(&task->request, out); + assert(msg->request.type == msg->header.type); + stat_collect(stat_base, msg->request.type, 1); + box_lua_call(&msg->request, out); break; case IPROTO_EVAL: - assert(task->request.type == task->header.type); - stat_collect(stat_base, task->request.type, 1); - box_lua_eval(&task->request, out); + assert(msg->request.type == msg->header.type); + stat_collect(stat_base, msg->request.type, 1); + box_lua_eval(&msg->request, out); break; case IPROTO_AUTH: { - assert(task->request.type == task->header.type); - const char *user = task->request.key; + assert(msg->request.type == msg->header.type); + const char *user = msg->request.key; uint32_t len = mp_decode_strl(&user); - authenticate(user, len, task->request.tuple, - task->request.tuple_end); - iproto_reply_ok(out, task->header.sync); + authenticate(user, len, msg->request.tuple, + msg->request.tuple_end); + iproto_reply_ok(out, msg->header.sync); break; } case IPROTO_PING: - iproto_reply_ok(out, task->header.sync); + iproto_reply_ok(out, msg->header.sync); break; case IPROTO_JOIN: /* @@ -740,28 +650,49 @@ iproto_process(struct iproto_task *task) * lambda in the beginning of the block * will re-activate the watchers for us. */ - ev_io_stop(con->loop, &con->input); - ev_io_stop(con->loop, &con->output); - box_process_join(con->input.fd, &task->header); + box_process_join(con->input.fd, &msg->header); break; case IPROTO_SUBSCRIBE: - ev_io_stop(con->loop, &con->input); - ev_io_stop(con->loop, &con->output); /* * Subscribe never returns - unless there * is an error/exception. In that case * the write watcher will be re-activated * the same way as for JOIN. */ - box_process_subscribe(con->input.fd, &task->header); + box_process_subscribe(con->input.fd, &msg->header); break; default: tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t) task->header.type); + (uint32_t) msg->header.type); } } catch (Exception *e) { - iproto_reply_error(out, e, task->header.sync); + iproto_reply_error(out, e, msg->header.sync); } + msg->write_end = obuf_create_svp(out); +} + +static void +net_send_msg(struct cmsg *m) +{ + struct iproto_msg *msg = (struct iproto_msg *) m; + struct iproto_connection *con = msg->connection; + struct iobuf *iobuf = msg->iobuf; + /* Discard request (see iproto_enqueue_batch()) */ + iobuf->in.rpos += msg->len; + iobuf->out.wend = msg->write_end; + if ((msg->header.type == IPROTO_SUBSCRIBE || + msg->header.type == IPROTO_JOIN)) { + assert(! ev_is_active(&con->input)); + ev_io_start(con->loop, &con->input); + } + + if (evio_has_fd(&con->output)) { + if (! ev_is_active(&con->output)) + ev_feed_event(con->loop, &con->output, EV_WRITE); + } else if (iproto_connection_is_idle(con)) { + iproto_connection_close(con); + } + iproto_msg_delete(msg); } const char * @@ -783,45 +714,67 @@ iproto_greeting(const char *salt) * upon a failure. */ static void -iproto_process_connect(struct iproto_task *task) +tx_process_connect(struct cmsg *m) { - struct iproto_connection *con = task->connection; - struct iobuf *iobuf = task->iobuf; - int fd = con->input.fd; + struct iproto_msg *msg = (struct iproto_msg *) m; + struct iproto_connection *con = msg->connection; + struct obuf *out = &msg->iobuf->out; try { /* connect. */ - con->session = session_create(fd, con->cookie); - coio_write(&con->input, iproto_greeting(con->session->salt), - IPROTO_GREETING_SIZE); + con->session = session_create(con->input.fd, con->cookie); + obuf_dup(out, iproto_greeting(con->session->salt), + IPROTO_GREETING_SIZE); if (! rlist_empty(&session_on_connect)) session_run_on_connect_triggers(con->session); - } catch (SocketError *e) { - e->log(); - iproto_connection_close(con); - return; + msg->write_end = obuf_create_svp(out); } catch (Exception *e) { - iproto_reply_error(&iobuf->out, e, task->header.type); + iproto_reply_error(out, e, 0 /* zero sync for connect error */); + msg->close_connection = true; + } +} + +/** + * Send a response to connect to the client or close the + * connection in case on_connect trigger failed. + */ +static void +net_send_greeting(struct cmsg *m) +{ + struct iproto_msg *msg = (struct iproto_msg *) m; + struct iproto_connection *con = msg->connection; + if (msg->close_connection) { + struct obuf *out = &msg->iobuf->out; try { - iproto_flush(iobuf, fd, &con->write_pos); + sio_writev(con->output.fd, out->iov, + obuf_iovcnt(out)); } catch (Exception *e) { e->log(); } + assert(iproto_connection_is_idle(con)); iproto_connection_close(con); + iproto_msg_delete(msg); return; } + con->iobuf[0]->out.wend = msg->write_end; /* * Connect is synchronous, so no one could have been * messing up with the connection while it was in * progress. */ - assert(evio_is_active(&con->input)); + assert(evio_has_fd(&con->output)); /* Handshake OK, start reading input. */ - ev_feed_event(con->loop, &con->input, EV_READ); + ev_feed_event(con->loop, &con->output, EV_WRITE); + iproto_msg_delete(msg); } +static struct cmsg_hop connect_route[] = { + { tx_process_connect, &net_pipe }, + { net_send_greeting, NULL }, +}; + /** }}} */ /** - * Create a connection context and start input. + * Create a connection and start input. */ static void iproto_on_accept(struct evio_service * /* service */, int fd, @@ -835,40 +788,154 @@ iproto_on_accept(struct evio_service * /* service */, int fd, con = iproto_connection_new(name, fd, addr); /* - * Ignore task allocation failure - the queue size is - * fixed so there is a limited number of tasks in + * Ignore msg allocation failure - the queue size is + * fixed so there is a limited number of msgs in * use, all stored in just a few blocks of the memory pool. */ - struct iproto_task *task = - iproto_task_new(con, iproto_process_connect); - task->iobuf = con->iobuf[0]; - iproto_queue_push(&tx_queue, task); + struct iproto_msg *msg = iproto_msg_new(con, connect_route); + msg->iobuf = con->iobuf[0]; + msg->close_connection = false; + cpipe_push(&tx_pipe, msg); } static struct evio_service binary; /* iproto binary listener */ -/** Initialize a read-write port. */ -void -iproto_init() +/** + * The network io thread main function: + * begin serving the message bus. + */ +static void +net_cord_f(va_list /* ap */) { - mempool_create(&iproto_task_pool, &cord()->slabc, - sizeof(struct iproto_task)); - iproto_queue_init(&tx_queue, iproto_tx_queue_cb); + /* Got to be called in every thread using iobuf */ + iobuf_init(); + mempool_create(&iproto_msg_pool, &cord()->slabc, + sizeof(struct iproto_msg)); + cpipe_create(&net_pipe); mempool_create(&iproto_connection_pool, &cord()->slabc, sizeof(struct iproto_connection)); evio_service_init(loop(), &binary, "binary", iproto_on_accept, NULL); + + cbus_join(&net_tx_bus, &net_pipe); + /* + * Nothing to do in the fiber so far, the service + * will take care of creating events for incoming + * connections. + */ + fiber_yield(); } +/** Initialize the iproto subsystem and start network io thread */ void -iproto_set_listen(const char *uri) +iproto_init() +{ + tx_cord = cord(); + + cbus_create(&net_tx_bus); + cpipe_create(&tx_pipe); + static struct cpipe_fiber_pool fiber_pool; + + cpipe_fiber_pool_create(&fiber_pool, "iproto", &tx_pipe, + IPROTO_FIBER_POOL_SIZE); + + static struct cord net_cord; + if (cord_costart(&net_cord, "iproto", net_cord_f, NULL)) + panic("failed to initialize iproto thread"); + + cbus_join(&net_tx_bus, &tx_pipe); +} + +/** + * Since there is no way to "synchronously" change the + * state of the io thread, to change the listen port + * we need to bounce a couple of messages to and + * from this thread. + */ +struct iproto_set_listen_msg: public cmsg { - if (evio_service_is_active(&binary)) - evio_service_stop(&binary); + /** + * If there was an error setting the listen port, + * this will contain the error when the message + * returns to the caller. + */ + struct diag diag; + /** + * The uri to set. + */ + const char *uri; + /** + * The way to tell the caller about the end of + * bind. + */ + struct cmsg_notify wakeup; +}; - if (uri != NULL) - coio_service_start(&binary, uri); +/** + * The bind has finished, notify the caller. + */ +static void +iproto_on_bind(void *arg) +{ + cpipe_push(&tx_pipe, (struct cmsg_notify *) arg); +} + +static void +iproto_do_set_listen(struct cmsg *m) +{ + struct iproto_set_listen_msg *msg = + (struct iproto_set_listen_msg *) m; + try { + if (evio_service_is_active(&binary)) + evio_service_stop(&binary); + + if (msg->uri != NULL) { + binary.on_bind = iproto_on_bind; + binary.on_bind_param = &msg->wakeup; + evio_service_start(&binary, msg->uri); + } else { + iproto_on_bind(&msg->wakeup); + } + } catch (Exception *e) { + diag_move(&fiber()->diag, &msg->diag); + } +} + +static void +iproto_set_listen_msg_init(struct iproto_set_listen_msg *msg, + const char *uri) +{ + static cmsg_hop route[] = { { iproto_do_set_listen, NULL }, }; + cmsg_init(msg, route); + msg->uri = uri; + diag_create(&msg->diag); + + cmsg_notify_init(&msg->wakeup); +} + +void +iproto_set_listen(const char *uri) +{ + /** + * This is a tricky orchestration for something + * that should be pretty easy at the first glance: + * change the listen uri in the io thread. + * + * To do it, create a message which sets the new + * uri, and another one, which will alert tx + * thread when bind() on the new port is done. + */ + static struct iproto_set_listen_msg msg; + iproto_set_listen_msg_init(&msg, uri); + + cpipe_push(&net_pipe, &msg); + /** Wait for the end of bind. */ + fiber_yield(); + if (! diag_is_empty(&msg.diag)) { + diag_move(&msg.diag, &fiber()->diag); + diag_last_error(&fiber()->diag)->raise(); + } } /* vim: set foldmethod=marker */ diff --git a/src/box/iproto_port.cc b/src/box/iproto_port.cc index 5171205aa06c34656273b804b381d49d7b2b52a1..a966ae9a1111d3fc6bc48b2ad1decf0936e250d0 100644 --- a/src/box/iproto_port.cc +++ b/src/box/iproto_port.cc @@ -130,7 +130,7 @@ void iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, uint32_t count) { - uint32_t len = obuf_used(buf) - svp->used - 5; + uint32_t len = obuf_size(buf) - svp->used - 5; struct iproto_header_bin header = iproto_header_bin; header.v_len = mp_bswap_u32(len); diff --git a/src/box/replica.cc b/src/box/replica.cc index 080ff04bf58ae55ded0694197a5a4a1c3e25ecb5..16a551c3eafe0d73f1a448d9fbbce6dcfa76bc7f 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -221,7 +221,7 @@ pull_from_remote(va_list ap) const char *err = NULL; try { struct xrow_header row; - if (! evio_is_active(&coio)) { + if (! evio_has_fd(&coio)) { remote_set_status(&r->remote, "connecting"); err = "can't connect to master"; remote_connect(r, &coio, iobuf); @@ -285,7 +285,7 @@ pull_from_remote(va_list ap) * * See: https://github.com/tarantool/tarantool/issues/136 */ - if (! evio_is_active(&coio)) + if (! evio_has_fd(&coio)) fiber_sleep(RECONNECT_DELAY); } } diff --git a/src/cbus.cc b/src/cbus.cc new file mode 100644 index 0000000000000000000000000000000000000000..56f5238171a4a2802c3395b409016f52f28ffd4f --- /dev/null +++ b/src/cbus.cc @@ -0,0 +1,300 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "cbus.h" +#include "scoped_guard.h" + +static void +cbus_flush_cb(ev_loop * /* loop */, struct ev_async *watcher, + int /* events */); + +static void +cpipe_fetch_output_cb(ev_loop * /* loop */, struct ev_async *watcher, + int /* events */) +{ + struct cpipe *pipe = (struct cpipe *) watcher->data; + struct cmsg *msg; + /* Force an exchange if there is nothing to do. */ + while (cpipe_peek(pipe)) { + while ((msg = cpipe_pop_output(pipe))) + cmsg_deliver(msg); + } +} + +void +cpipe_create(struct cpipe *pipe) +{ + STAILQ_INIT(&pipe->pipe); + STAILQ_INIT(&pipe->input); + STAILQ_INIT(&pipe->output); + + pipe->n_input = 0; + pipe->max_input = INT_MAX; + + ev_async_init(&pipe->flush_input, cbus_flush_cb); + pipe->flush_input.data = pipe; + + ev_async_init(&pipe->fetch_output, cpipe_fetch_output_cb); + pipe->fetch_output.data = pipe; + pipe->consumer = loop(); + ev_async_start(pipe->consumer, &pipe->fetch_output); +} + +static void +cpipe_join(struct cpipe *pipe, struct cbus *bus, struct cpipe *peer) +{ + assert(loop() == pipe->consumer); + pipe->bus = bus; + pipe->peer = peer; + pipe->producer = peer->consumer; +} + +void +cbus_create(struct cbus *bus) +{ + bus->pipe[0] = bus->pipe[1] = NULL; + + pthread_mutexattr_t errorcheck; + + (void) tt_pthread_mutexattr_init(&errorcheck); + +#ifndef NDEBUG + (void) tt_pthread_mutexattr_settype(&errorcheck, + PTHREAD_MUTEX_ERRORCHECK); +#endif + /* Initialize queue lock mutex. */ + (void) tt_pthread_mutex_init(&bus->mutex, &errorcheck); + (void) tt_pthread_mutexattr_destroy(&errorcheck); + + (void) tt_pthread_cond_init(&bus->cond, NULL); +} + +void +cbus_destroy(struct cbus *bus) +{ + (void) tt_pthread_mutex_destroy(&bus->mutex); + (void) tt_pthread_cond_destroy(&bus->cond); +} + +static inline void +cbus_lock(struct cbus *bus) +{ + tt_pthread_mutex_lock(&bus->mutex); +} + +static inline void +cbus_unlock(struct cbus *bus) +{ + tt_pthread_mutex_unlock(&bus->mutex); +} + +/** + * @pre both consumers initialized their pipes + * @post each consumers gets the input end of the opposite pipe + */ +struct cpipe * +cbus_join(struct cbus *bus, struct cpipe *pipe) +{ + /* + * We can't let one or the other thread go off and + * produce events/send ev_async callback messages + * until the peer thread has initialized the async + * and started it. + * Use a condition variable to make sure that two + * threads operate in sync. + */ + cbus_lock(bus); + int pipe_idx = bus->pipe[0] != NULL; + int peer_idx = !pipe_idx; + bus->pipe[pipe_idx] = pipe; + while (bus->pipe[peer_idx] == NULL) + tt_pthread_cond_wait(&bus->cond, &bus->mutex); + cpipe_join(pipe, bus, bus->pipe[peer_idx]); + cbus_unlock(bus); + /* + * POSIX: the and pthread_cond_signal() function shall + * have no effect if there are no threads currently + * blocked on cond. + */ + pthread_cond_signal(&bus->cond); + return bus->pipe[peer_idx]; +} + +static void +cbus_flush_cb(ev_loop * /* loop */, struct ev_async *watcher, + int /* events */) +{ + struct cpipe *pipe = (struct cpipe *) watcher->data; + if (pipe->n_input == 0) + return; + struct cpipe *peer = pipe->peer; + assert(pipe->producer == loop()); + assert(peer->consumer == loop()); + + /* Trigger task processing when the queue becomes non-empty. */ + bool pipe_was_empty; + bool peer_output_was_empty = STAILQ_EMPTY(&peer->output); + + cbus_lock(pipe->bus); + pipe_was_empty = STAILQ_EMPTY(&pipe->pipe); + /** Flush input */ + STAILQ_CONCAT(&pipe->pipe, &pipe->input); + /* + * While at it, pop output. + * The consumer of the output of the bound queue is the + * same as the producer of input, so we can safely access it. + * We can safely access queue because it's locked. + */ + STAILQ_CONCAT(&peer->output, &peer->pipe); + cbus_unlock(pipe->bus); + + pipe->n_input = 0; + if (pipe_was_empty) + ev_async_send(pipe->consumer, &pipe->fetch_output); + if (peer_output_was_empty && !STAILQ_EMPTY(&peer->output)) + ev_feed_event(peer->consumer, &peer->fetch_output, EV_CUSTOM); +} + +struct cmsg * +cpipe_peek_impl(struct cpipe *pipe) +{ + assert(STAILQ_EMPTY(&pipe->output)); + + struct cpipe *peer = pipe->peer; + assert(pipe->consumer == loop()); + assert(peer->producer == loop()); + + bool peer_pipe_was_empty = false; + cbus_lock(pipe->bus); + STAILQ_CONCAT(&pipe->output, &pipe->pipe); + if (! STAILQ_EMPTY(&peer->input)) { + peer_pipe_was_empty = STAILQ_EMPTY(&peer->pipe); + STAILQ_CONCAT(&peer->pipe, &peer->input); + } + cbus_unlock(pipe->bus); + peer->n_input = 0; + + if (peer_pipe_was_empty) + ev_async_send(peer->consumer, &peer->fetch_output); + return STAILQ_FIRST(&pipe->output); +} + + +static void +cmsg_notify_deliver(struct cmsg *msg) +{ + fiber_wakeup(((struct cmsg_notify *) msg)->fiber); +} + +void +cmsg_notify_init(struct cmsg_notify *msg) +{ + static cmsg_hop route[] = { { cmsg_notify_deliver, NULL }, }; + + cmsg_init(msg, route); + msg->fiber = fiber(); +} + + +/** Return true if there are too many active workers in the pool. */ +static inline bool +cpipe_fiber_pool_needs_throttling(struct cpipe_fiber_pool *pool) +{ + return pool->size > pool->max_size; +} + +/** + * Main function of the fiber invoked to handle all outstanding + * tasks in a queue. + */ +static void +cpipe_fiber_pool_f(va_list ap) +{ + struct cpipe_fiber_pool *pool = va_arg(ap, struct cpipe_fiber_pool *); + struct cpipe *pipe = pool->pipe; + struct cmsg *msg; + pool->size++; + auto size_guard = make_scoped_guard([=]{ pool->size--; }); +restart: + while ((msg = cpipe_pop_output(pipe))) + cmsg_deliver(msg); + + if (pool->cache_size < 2 * pool->max_size) { + /** Put the current fiber into a fiber cache. */ + rlist_add_entry(&pool->fiber_cache, fiber(), state); + pool->size--; + pool->cache_size++; + fiber_yield(); + pool->cache_size--; + pool->size++; + goto restart; + } +} + + +/** Create fibers to handle all outstanding tasks. */ +static void +cpipe_fiber_pool_cb(ev_loop * /* loop */, struct ev_async *watcher, + int /* events */) +{ + struct cpipe_fiber_pool *pool = (struct cpipe_fiber_pool *) watcher->data; + struct cpipe *pipe = pool->pipe; + (void) cpipe_peek(pipe); + while (! STAILQ_EMPTY(&pipe->output)) { + struct fiber *f; + if (! rlist_empty(&pool->fiber_cache)) { + f = rlist_shift_entry(&pool->fiber_cache, + struct fiber, state); + fiber_call(f); + } else if (! cpipe_fiber_pool_needs_throttling(pool)) { + f = fiber_new(pool->name, cpipe_fiber_pool_f); + fiber_start(f, pool); + } else { + /** + * No worries that this watcher may not + * get scheduled again - there are enough + * worker fibers already, so just leave. + */ + break; + } + } +} + +void +cpipe_fiber_pool_create(struct cpipe_fiber_pool *pool, + const char *name, struct cpipe *pipe, + int max_pool_size) +{ + rlist_create(&pool->fiber_cache); + pool->name = name; + pool->pipe = pipe; + pool->size = 0; + pool->cache_size = 0; + pool->max_size = max_pool_size; + cpipe_set_fetch_cb(pipe, cpipe_fiber_pool_cb, pool); +} diff --git a/src/cbus.h b/src/cbus.h new file mode 100644 index 0000000000000000000000000000000000000000..98825ce7b4b19c631f69e215e167f8f2cebc3bad --- /dev/null +++ b/src/cbus.h @@ -0,0 +1,421 @@ +#ifndef TARANTOOL_CBUS_H_INCLUDED +#define TARANTOOL_CBUS_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "fiber.h" +#include "coio.h" + +/** cbus, cmsg - inter-cord bus and messaging */ + +struct cmsg; +struct cpipe; +typedef void (*cmsg_f)(struct cmsg *); + +/** + * One hop in a message travel route. A message may need to be + * delivered to many destinations before it can be dispensed with. + * For example, it may be necessary to return a message to the + * sender just to destroy it. + * + * Message travel route is an array of cmsg_hop entries. The first + * entry contains a delivery function at the first destination, + * and the next destination. Subsequent entries are alike. The + * last entry has a delivery function (most often a message + * destructor) and NULL for the next destination. + */ +struct cmsg_hop { + /** The message delivery function. */ + cmsg_f f; + /** + * The next destination to which the message + * should be routed after its delivered locally. + */ + struct cpipe *pipe; +}; + +/** A message traveling between cords. */ +struct cmsg { + /** + * A member of the linked list - fifo of the pipe the + * message is stuck in currently, waiting to get + * delivered. + */ + STAILQ_ENTRY(cmsg) fifo; + /** The message routing path. */ + struct cmsg_hop *route; + /** The current hop the message is at. */ + struct cmsg_hop *hop; +}; + +/** Initialize the message and set its route. */ +static inline void +cmsg_init(struct cmsg *msg, struct cmsg_hop *route) +{ + /** + * The first hop can be done explicitly with cbus_push(), + * msg->hop thus points to the second hop. + */ + msg->hop = msg->route = route; +} + +STAILQ_HEAD(cmsg_fifo, cmsg); + +#define CACHELINE_SIZE 64 +/** A uni-directional FIFO queue from one cord to another. */ +struct cpipe { + /** + * The protected part of the pipe: only accessible + * in a critical section. + * The message flow in the pipe is: + * input <-- owned by the consumer thread + * v + * pipe <-- shared, protected by a mutex + * v + * output <-- owned by the producer thread + */ + struct { + struct cmsg_fifo pipe; + /** Peer pipe - the other direction of the bus. */ + struct cpipe *peer; + struct cbus *bus; + } __attribute__((aligned(CACHELINE_SIZE))); + /** Stuff most actively used in the producer thread. */ + struct { + /** Staging area for pushed messages */ + struct cmsg_fifo input; + /** Counters are useful for finer-grained scheduling. */ + int n_input; + /** + * When pushing messages, keep the staged input size under + * this limit (speeds up message delivery and reduces + * latency, while still keeping the bus mutex cold enough). + */ + int max_input; + /** + * Rather than flushing input into the pipe + * whenever a single message or a batch is + * complete, do it once per event loop iteration. + */ + struct ev_async flush_input; + /** The producer thread. */ + struct ev_loop *producer; + /** The consumer thread. */ + struct ev_loop *consumer; + } __attribute__((aligned(CACHELINE_SIZE))); + /** Stuff related to the consumer. */ + struct { + /** Staged messages (for pop) */ + struct cmsg_fifo output; + /** + * Used to trigger task processing when + * the pipe becomes non-empty. + */ + struct ev_async fetch_output; + } __attribute__((aligned(CACHELINE_SIZE))); +}; + +#undef CACHELINE_SIZE + +/** + * Initialize a pipe. Must be called by the consumer. + */ +void +cpipe_create(struct cpipe *pipe); + +/** + * Reset the default fetch output callback with a custom one. + */ +static inline void +cpipe_set_fetch_cb(struct cpipe *pipe, ev_async_cb fetch_output_cb, + void *data) +{ + assert(loop() == pipe->consumer); + /* + * According to libev documentation, you can set cb at + * virtually any time, modulo threads. + */ + ev_set_cb(&pipe->fetch_output, fetch_output_cb); + pipe->fetch_output.data = data; +} + +/** + * Pop a single message from the staged output area. If + * the output is empty, returns NULL. There may be messages + * in the pipe! + */ +static inline struct cmsg * +cpipe_pop_output(struct cpipe *pipe) +{ + assert(loop() == pipe->consumer); + + if (STAILQ_EMPTY(&pipe->output)) + return NULL; + struct cmsg *msg = STAILQ_FIRST(&pipe->output); + STAILQ_REMOVE_HEAD(&pipe->output, fifo); + return msg; +} + +struct cmsg * +cpipe_peek_impl(struct cpipe *pipe); + +/** + * Check if the pipe has any messages. Triggers a bus + * exchange in a critical section if the pipe is empty. + */ +static inline struct cmsg * +cpipe_peek(struct cpipe *pipe) +{ + assert(loop() == pipe->consumer); + + if (STAILQ_EMPTY(&pipe->output)) + return cpipe_peek_impl(pipe); + + return STAILQ_FIRST(&pipe->output); +} + +/** + * Pop a single message. Triggers a bus exchange + * if the pipe is empty. + */ +static inline struct cmsg * +cpipe_pop(struct cpipe *pipe) +{ + if (cpipe_peek(pipe) == NULL) + return NULL; + return cpipe_pop_output(pipe); +} + +/** + * Set pipe max size of staged push area. The default is infinity. + * If staged push cap is set, the pushed messages are flushed + * whenever the area has more messages than the cap, and also once + * per event loop. + * Otherwise, the messages flushed once per event loop iteration. + * + * @todo: collect bus stats per second and adjust max_input once + * a second to keep the mutex cold regardless of the message load, + * while still keeping the latency low if there are few + * long-to-process messages. + */ +static inline void +cpipe_set_max_input(struct cpipe *pipe, int max_input) +{ + assert(loop() == pipe->producer); + pipe->max_input = max_input; +} + +/** + * Flush all staged messages into the pipe and eventually to the + * consumer. + */ +static inline void +cpipe_flush_input(struct cpipe *pipe) +{ + assert(loop() == pipe->producer); + + /** Flush may be called with no input. */ + if (pipe->n_input > 0) { + if (pipe->n_input < pipe->max_input) { + /* + * Not much input, can deliver all + * messages at the end of the event loop + * iteration. + */ + ev_feed_event(pipe->producer, + &pipe->flush_input, EV_CUSTOM); + } else { + /* + * Wow, it's a lot of stuff piled up, + * deliver immediately. + */ + ev_invoke(pipe->producer, + &pipe->flush_input, EV_CUSTOM); + } + } +} + +/** + * Push a single message to the pipe input. The message is pushed + * to a staging area. To be delivered, the input needs to be + * flushed with cpipe_flush_input(). + */ +static inline void +cpipe_push_input(struct cpipe *pipe, struct cmsg *msg) +{ + assert(loop() == pipe->producer); + + STAILQ_INSERT_TAIL(&pipe->input, msg, fifo); + pipe->n_input++; + if (pipe->n_input >= pipe->max_input) + ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM); +} + +/** + * Push a single message and ensure it's delivered. + * A combo of push_input + flush_input for cases when + * it's not known at all whether there'll be other + * messages coming up. + */ +static inline void +cpipe_push(struct cpipe *pipe, struct cmsg *msg) +{ + cpipe_push_input(pipe, msg); + assert(pipe->n_input < pipe->max_input); + if (pipe->n_input == 1) + ev_feed_event(pipe->producer, &pipe->flush_input, EV_CUSTOM); +} + +/** + * Cord interconnect: two pipes one for each message flow + * direction. + */ +struct cbus { + /** Two pipes for two directions between two cords. */ + struct cpipe *pipe[2]; + /** + * A single mutex to protect all exchanges around the + * two pipes involved in the bus. + */ + pthread_mutex_t mutex; + /** Condition for synchronized start of the bus. */ + pthread_cond_t cond; +}; + +void +cbus_create(struct cbus *bus); + +/** + * Connect the pipes: join cord1 input to the cord2 output, + * and cord1 output to cord2 input. + * Each cord must invoke this method in its own scope, + * and provide its own callback to handle incoming messages + * (pop_output_cb). + * This call synchronizes two threads, and after this method + * returns, cpipe_push/cpipe_pop are safe to use. + * + * @param bus the bus + * @param pipe the pipe for which this thread is a consumer + * + * @retval the pipe for this this thread is a producer + * + * @example: + * cpipe_create(&in); + * struct cpipe *out = cbus_join(bus, &in); + * cpipe_set_max_input(out, 128); + * cpipe_push(out, msg); + */ +struct cpipe * +cbus_join(struct cbus *bus, struct cpipe *pipe); + +/** + * Dispatch the message to the next hop. + */ +static inline void +cmsg_dispatch(struct cpipe *pipe, struct cmsg *msg) +{ + /** + * 'pipe' pointer saved in class constructor works as + * a guard that the message is alive. If a message route + * has the next pipe, then the message mustn't have been + * destroyed on this hop. Otherwise msg->hop->pipe could + * be already pointing to garbage. + */ + if (pipe) { + /* + * Once we pushed the message to the bus, + * we relinquished all write access to it, + * so we must increase the current hop *before* + * push. + */ + msg->hop++; + cpipe_push(pipe, msg); + } +} + +struct CmsgDispatchGuard { + struct cpipe *pipe; + struct cmsg *msg; + CmsgDispatchGuard(struct cmsg *msg_arg) + :pipe(msg_arg->hop->pipe), msg(msg_arg) {} + ~CmsgDispatchGuard() { cmsg_dispatch(pipe, msg); } +}; + +/** + * Deliver the message and dispatch it to the next hop. + */ +static inline void +cmsg_deliver(struct cmsg *msg) +{ + /** + * Ensure dispatch happens even if there is an exception, + * otherwise the message may leak. + */ + CmsgDispatchGuard guard(msg); + msg->hop->f(msg); +} + +/** + * A helper message to wakeup caller whenever an event + * occurs. + */ +struct cmsg_notify: public cmsg +{ + struct fiber *fiber; +}; + +void +cmsg_notify_init(struct cmsg_notify *msg); + +/** + * A pool of worker fibers to handle messages, + * so that each message is handled in its own fiber. + */ +struct cpipe_fiber_pool { + const char *name; + /** Cache of fibers which work on incoming messages. */ + struct rlist fiber_cache; + /** The number of active fibers working on tasks. */ + int size; + /** The number of sleeping fibers, in the cache */ + int cache_size; + /** The limit on the number of fibers working on tasks. */ + int max_size; + struct cpipe *pipe; +}; + +/** + * Initialize a fiber pool and connect it to a pipe. Currently + * must be done before the pipe is actively used by a bus. + */ +void +cpipe_fiber_pool_create(struct cpipe_fiber_pool *pool, + const char *name, struct cpipe *pipe, + int max_pool_size); + +#endif /* TARANTOOL_CBUS_H_INCLUDED */ diff --git a/src/coio.cc b/src/coio.cc index 09b3337a0f927ad8174a049be6e13386a314385e..71ed0b111348794cf04d08d089f598003859277f 100644 --- a/src/coio.cc +++ b/src/coio.cc @@ -202,12 +202,12 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, hints.ai_protocol = 0; int rc = coio_getaddrinfo(host, service, &hints, &ai, delay); if (rc != 0) { - ai = NULL; + if (errno == ETIMEDOUT) + return -1; /* timeout */ + tnt_raise(SocketError, -1, "getaddrinfo"); } - } - if (ai == NULL) - return -1; /* timeout */ + } auto addrinfo_guard = make_scoped_guard([=] { if (!uri->host_hint) freeaddrinfo(ai); else free(ai_local.ai_addr); @@ -215,7 +215,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, evio_timeout_update(loop(), start, &delay); coio_timeout_init(&start, &delay, timeout); - assert(! evio_is_active(coio)); + assert(! evio_has_fd(coio)); while (ai) { try { if (coio_connect_addr(coio, ai->ai_addr, diff --git a/src/evio.cc b/src/evio.cc index 53884a803bdf82e4e77d9e0f8f7cec1cd74de4b8..7730775ec5d42aadc803896b36bb8ec45f8b1c0d 100644 --- a/src/evio.cc +++ b/src/evio.cc @@ -51,7 +51,7 @@ evio_close(ev_loop *loop, struct ev_io *evio) ev_io_stop(loop, evio); /* Close the socket. */ close(evio->fd); - /* Make sure evio_is_active() returns a proper value. */ + /* Make sure evio_has_fd() returns a proper value. */ evio->fd = -1; } diff --git a/src/evio.h b/src/evio.h index ff597226833ad58e2b999730fec876ca728e1cf7..2a2bab70966ed901f66cc40b0c8598f1bd8de3d9 100644 --- a/src/evio.h +++ b/src/evio.h @@ -139,7 +139,7 @@ evio_service_is_active(struct evio_service *service) } static inline bool -evio_is_active(struct ev_io *ev) +evio_has_fd(struct ev_io *ev) { return ev->fd >= 0; } diff --git a/src/iobuf.cc b/src/iobuf.cc index 560cd90a52b113f7324b4ddf7fe2cc08ca7f85d8..b1aad0c7680a151e0db411beb07c8364f1311720 100644 --- a/src/iobuf.cc +++ b/src/iobuf.cc @@ -90,7 +90,7 @@ iobuf_delete_mt(struct iobuf *iobuf) { ibuf_destroy(&iobuf->in); /* Destroyed by the caller. */ - assert(&iobuf->out.pos == 0 && iobuf->out.iov[0].iov_base == NULL); + assert(iobuf->out.pos == 0 && iobuf->out.iov[0].iov_base == NULL); mempool_free(&iobuf_pool, iobuf); } diff --git a/src/iobuf.h b/src/iobuf.h index 8c6c66c07e6fc32be8c19e0281b1ac086988f996..671813712b00862def9314666c93edb225daa092 100644 --- a/src/iobuf.h +++ b/src/iobuf.h @@ -33,7 +33,6 @@ #include "small/ibuf.h" #include "small/obuf.h" -/** {{{ Input/output pair. */ struct iobuf { /** Input buffer. */ @@ -86,7 +85,7 @@ iobuf_reset(struct iobuf *iobuf); static inline bool iobuf_is_idle(struct iobuf *iobuf) { - return ibuf_used(&iobuf->in) == 0 && obuf_used(&iobuf->out) == 0; + return ibuf_used(&iobuf->in) == 0 && obuf_size(&iobuf->out) == 0; } /** @@ -99,6 +98,4 @@ iobuf_init(); void iobuf_set_readahead(int readahead); -/* }}} */ - #endif /* TARANTOOL_IOBUF_H_INCLUDED */ diff --git a/src/lib/small/obuf.c b/src/lib/small/obuf.c index dcd8b526b89dd7ec9ab5ac302dc35e9f457ce0b6..13e9041a63a9945f7d0db90f208bdd43a152b46d 100644 --- a/src/lib/small/obuf.c +++ b/src/lib/small/obuf.c @@ -70,6 +70,7 @@ obuf_create(struct obuf *buf, struct slab_cache *slabc, size_t start_capacity) buf->iov[0].iov_base = NULL; buf->iov[0].iov_len = 0; buf->capacity[0] = 0; + buf->wend = buf->wpos = obuf_create_svp(buf); } @@ -82,6 +83,7 @@ obuf_reset(struct obuf *buf) buf->iov[i].iov_len = 0; buf->pos = 0; buf->used = 0; + buf->wend = buf->wpos = obuf_create_svp(buf); } void diff --git a/src/lib/small/obuf.h b/src/lib/small/obuf.h index de0f2f99e0af6f298f0730a1b1c636e9ee7683b1..2be841ac5439ae717e57f468ef9d0231b3423f35 100644 --- a/src/lib/small/obuf.h +++ b/src/lib/small/obuf.h @@ -78,15 +78,28 @@ struct obuf * obuf_reserve(). */ size_t start_capacity; + /** How many bytes are actually allocated for each iovec. */ + size_t capacity[SMALL_OBUF_IOV_MAX + 1]; /** * List of iovec vectors, each vector is at least twice * as big as the previous one. The vector following the * last allocated one is always zero-initialized * (iov_base = NULL, iov_len = 0). + * Make it the last member to reduce friction around + * wpos/wend in iproto thread - last elements + * of iov are unlikely to be updated often. */ struct iovec iov[SMALL_OBUF_IOV_MAX + 1]; - /** How many bytes are actually allocated for each iovec. */ - size_t capacity[SMALL_OBUF_IOV_MAX + 1]; + /** + * The below two members are used by iproto thread, + * avoid false sharing by cache aligning them. + */ + struct { + /** Current write position in the output buffer */ + struct obuf_svp wpos; + /** End of write position in the output buffer */ + struct obuf_svp wend; + } __attribute__((aligned(64))); }; void @@ -100,11 +113,17 @@ obuf_reset(struct obuf *buf); /** How many bytes are in the output buffer. */ static inline size_t -obuf_used(struct obuf *obuf) +obuf_size(struct obuf *obuf) { return obuf->used; } +static inline size_t +obuf_used(struct obuf *obuf) +{ + return obuf->wend.used - obuf->wpos.used; +} + /** The size of iov vector in the buffer. */ static inline int obuf_iovcnt(struct obuf *buf) diff --git a/test/replication/cluster.result b/test/replication/cluster.result index 8268cb49c928a56530282207372c01dd15a0056c..5200f3f9e154a11d382f51c21a894e478b56029d 100644 --- a/test/replication/cluster.result +++ b/test/replication/cluster.result @@ -114,7 +114,7 @@ box.info.vclock[2] ... box.space._schema:replace{"test", 48} --- -- error: Can't modify data because this server in read-only mode. +- error: Can't modify data because this server is in read-only mode. ... box.space._cluster:insert{10, "<replica uuid>"} --- diff --git a/test/replication/readonly.result b/test/replication/readonly.result index dd38c37d9d0d642d8bc6f6e8389af98152c91a90..a7848717d0a7bcc1fcea98adc0807cef4fa23889 100644 --- a/test/replication/readonly.result +++ b/test/replication/readonly.result @@ -33,7 +33,7 @@ box.info.server.lsn ... space = box.schema.space.create("ro") --- -- error: Can't modify data because this server in read-only mode. +- error: Can't modify data because this server is in read-only mode. ... box.info.vclock[2] ---