Skip to content
Snippets Groups Projects
iproto.cc 27.66 KiB
/*
 * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
 *
 * Redistribution and use in source and binary forms, with or
 * without modification, are permitted provided that the following
 * conditions are met:
 *
 * 1. Redistributions of source code must retain the above
 *    copyright notice, this list of conditions and the
 *    following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above
 *    copyright notice, this list of conditions and the following
 *    disclaimer in the documentation and/or other materials
 *    provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */
#include "iproto.h"
#include <string.h>
#include <stdint.h>
#include <stdarg.h>
#include <stdio.h>

#include "iproto_port.h"
#include "main.h"
#include "fiber.h"
#include "cbus.h"
#include "say.h"
#include "evio.h"
#include "scoped_guard.h"
#include "memory.h"
#include "msgpuck/msgpuck.h"
#include "session.h"
#include "third_party/base64.h"
#include "coio.h"
#include "xrow.h"
#include "iproto_constants.h"
#include "user_def.h"
#include "authentication.h"
#include "rmean.h"
#include "lua/call.h"

/* {{{ iproto_msg - declaration */

/**
 * A single msg from io thread. All requests
 * from all connections are queued into a single queue
 * and processed in FIFO order.
 */
struct iproto_msg: public cmsg
{
	struct iproto_connection *connection;

	/* --- Box msgs - actual requests for the transaction processor --- */
	/* Request message code and sync. */
	struct xrow_header header;
	/* Box request, if this is a DML */
	struct request request;
	/*
	 * Remember the active iobuf of the connection,
	 * in which the request is stored. The response
	 * must be put into the out buffer of this iobuf.
	 */
	struct iobuf *iobuf;
	/**
	 * How much space the request takes in the
	 * input buffer (len, header and body - all of it)
	 * This also works as a reference counter to
	 * iproto_connection object.
	 */
	size_t len;
	/** End of write position in the output buffer */
	struct obuf_svp write_end;
	/**
	 * Used in "connect" msgs, true if connect trigger failed
	 * and the connection must be closed.
	 */
	bool close_connection;
};

static struct mempool iproto_msg_pool;

static struct iproto_msg *
iproto_msg_new(struct iproto_connection *con, struct cmsg_hop *route)
{
	struct iproto_msg *msg =
		(struct iproto_msg *) mempool_alloc(&iproto_msg_pool);
	cmsg_init(msg, route);
	msg->connection = con;
	return msg;
}

static inline void
iproto_msg_delete(struct cmsg *msg)
{
	mempool_free(&iproto_msg_pool, msg);
}

struct IprotoMsgGuard {
	struct iproto_msg *msg;
	IprotoMsgGuard(struct iproto_msg *msg_arg):msg(msg_arg) {}
	~IprotoMsgGuard()
	{ if (msg) iproto_msg_delete(msg); }
	struct iproto_msg *release()
	{ struct iproto_msg *tmp = msg; msg = NULL; return tmp; }
};

enum { IPROTO_FIBER_POOL_SIZE = 1024, IPROTO_FIBER_POOL_IDLE_TIMEOUT = 3 };

/* }}} */

/* {{{ iproto connection and requests */

/**
 * A single global queue for all requests in all connections. All
 * requests from all connections are processed concurrently.
 * Is also used as a queue for just established connections and to
 * execute disconnect triggers. A few notes about these triggers:
 * - they need to be run in a fiber
 * - unlike an ordinary request failure, on_connect trigger
 *   failure must lead to connection close.
 * - on_connect trigger must be processed before any other
 *   request on this connection.
 */
static struct cpipe tx_pipe;
static struct cpipe net_pipe;
static struct cbus net_tx_bus;
/* A pointer to the transaction processor cord. */
struct cord *tx_cord;

/** Context of a single client connection. */
struct iproto_connection
{
	/**
	 * Two rotating buffers for I/O. Input is always read into
	 * iobuf[0]. As soon as iobuf[0] input buffer becomes full,
	 * iobuf[0] is moved to iobuf[1], for flushing. As soon as
	 * all output in iobuf[1].out is sent to the client, iobuf[1]
	 * and iobuf[0] are moved around again.
	 */
	struct iobuf *iobuf[2];
	/*
	 * Size of readahead which is not parsed yet, i.e.
	 * size of a piece of request which is not fully read.
	 * Is always relative to iobuf[0]->in.wpos. In other words,
	 * iobuf[0]->in.wpos - parse_size gives the start of the
	 * unparsed request. A size rather than a pointer is used
	 * to be safe in case in->buf is reallocated. Being
	 * relative to in->wpos, rather than to in->rpos is helpful to
	 * make sure ibuf_reserve() or iobuf rotation don't make
	 * the value meaningless.
	 */
	ssize_t parse_size;
	struct ev_io input;
	struct ev_io output;
	/** Logical session. */
	struct session *session;
	uint64_t cookie;
	ev_loop *loop;
	/* Pre-allocated disconnect msg. */
	struct iproto_msg *disconnect;
};

static struct mempool iproto_connection_pool;

/**
 * A connection is idle when the client is gone
 * and there are no outstanding msgs in the msg queue.
 * An idle connection can be safely garbage collected.
 * Note: a connection only becomes idle after iproto_connection_close(),
 * which closes the fd.  This is why here the check is for
 * evio_has_fd(), not ev_is_active()  (false if event is not
 * started).
 *
 * ibuf_size() provides an effective reference counter
 * on connection use in the tx request queue. Any request
 * in the request queue has a non-zero len, and ibuf_size()
 * is therefore non-zero as long as there is at least
 * one request in the tx queue.
 */
static inline bool
iproto_connection_is_idle(struct iproto_connection *con)
{
	return ibuf_used(&con->iobuf[0]->in) == 0 &&
		ibuf_used(&con->iobuf[1]->in) == 0;
}

static void
iproto_connection_on_input(ev_loop * /* loop */, struct ev_io *watcher,
			   int /* revents */);
static void
iproto_connection_on_output(ev_loop * /* loop */, struct ev_io *watcher,
			    int /* revents */);

/** Recycle a connection. Never throws. */
static inline void
iproto_connection_delete(struct iproto_connection *con)
{
	assert(iproto_connection_is_idle(con));
	assert(!evio_has_fd(&con->output));
	assert(!evio_has_fd(&con->input));
	assert(con->session == NULL);
	/*
	 * The output buffers must have been deleted
	 * in tx thread.
	 */
	iobuf_delete_mt(con->iobuf[0]);
	iobuf_delete_mt(con->iobuf[1]);
	if (con->disconnect)
		iproto_msg_delete(con->disconnect);
	mempool_free(&iproto_connection_pool, con);
}

static void
tx_process_msg(struct cmsg *msg);

static void
net_send_msg(struct cmsg *msg);

/**
 * Fire on_disconnect triggers in the tx
 * thread and destroy the session object,
 * as well as output buffers of the connection.
 */
static void
tx_process_disconnect(struct cmsg *m)
{
	struct iproto_msg *msg = (struct iproto_msg *) m;
	struct iproto_connection *con = msg->connection;
	if (con->session) {
		if (! rlist_empty(&session_on_disconnect))
			session_run_on_disconnect_triggers(con->session);
		session_destroy(con->session);
		con->session = NULL; /* safety */
	}
	/*
	 * Got to be done in iproto thread since
	 * that's where the memory is allocated.
	 */
	obuf_destroy(&con->iobuf[0]->out);
	obuf_destroy(&con->iobuf[1]->out);
}

/**
 * Cleanup the net thread resources of a connection
 * and close the connection.
 */
static void
net_finish_disconnect(struct cmsg *m)
{
	struct iproto_msg *msg = (struct iproto_msg *) m;
	/* Runs the trigger, which may yield. */
	iproto_connection_delete(msg->connection);
	iproto_msg_delete(msg);
}

static struct cmsg_hop disconnect_route[] = {
	{ tx_process_disconnect, &net_pipe },
	{ net_finish_disconnect, NULL },
};


static struct cmsg_hop request_route[] = {
	{ tx_process_msg, &net_pipe },
	{ net_send_msg, NULL },
};
static struct iproto_connection *
iproto_connection_new(const char *name, int fd, struct sockaddr *addr)
{
	(void) name;
	struct iproto_connection *con = (struct iproto_connection *)
		mempool_alloc(&iproto_connection_pool);
	con->input.data = con->output.data = con;
	con->loop = loop();
	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
	con->iobuf[0] = iobuf_new_mt(&tx_cord->slabc);
	con->iobuf[1] = iobuf_new_mt(&tx_cord->slabc);
	con->parse_size = 0;
	con->session = NULL;
	con->cookie = *(uint64_t *) addr;
	/* It may be very awkward to allocate at close. */
	con->disconnect = iproto_msg_new(con, disconnect_route);
	return con;
}

/**
 * Initiate a connection shutdown. This method may
 * be invoked many times, and does the internal
 * bookkeeping to only cleanup resources once.
 */
static inline void
iproto_connection_close(struct iproto_connection *con)
{
	if (evio_has_fd(&con->input)) {
		/* Clears all pending events. */
		ev_io_stop(con->loop, &con->input);
		ev_io_stop(con->loop, &con->output);

		int fd = con->input.fd;
		/* Make evio_has_fd() happy */
		con->input.fd = con->output.fd = -1;
		close(fd);
		/*
		 * Discard unparsed data, to recycle the
		 * connection in net_send_msg() as soon as all
		 * parsed data is processed.  It's important this
		 * is done only once.
		 */
		con->iobuf[0]->in.wpos -= con->parse_size;
	}
	/*
	 * If the connection has no outstanding requests in the
	 * input buffer, then no one (e.g. tx thread) is referring
	 * to it, so it must be destroyed at once. Queue a msg to
	 * run on_disconnect() trigger and destroy the connection.
	 *
	 * Otherwise, it will be destroyed by the last request on
	 * this connection that has finished processing.
	 *
	 * The check is mandatory to not destroy a connection
	 * twice.
	 */
	if (iproto_connection_is_idle(con)) {
		assert(con->disconnect != NULL);
		struct iproto_msg *msg = con->disconnect;
		con->disconnect = NULL;
		cpipe_push(&tx_pipe, msg);
	}
}

/**
 * If there is no space for reading input, we can do one of the
 * following:
 * - try to get a new iobuf, so that it can fit the request.
 *   Always getting a new input buffer when there is no space
 *   makes the server susceptible to input-flood attacks.
 *   Therefore, at most 2 iobufs are used in a single connection,
 *   one is "open", receiving input, and the  other is closed,
 *   flushing output.
 * - stop input and wait until the client reads piled up output,
 *   so the input buffer can be reused. This complements
 *   the previous strategy. It is only safe to stop input if it
 *   is known that there is output. In this case input event
 *   flow will be resumed when all replies to previous requests
 *   are sent, in iproto_connection_gc_iobuf(). Since there are two
 *   buffers, the input is only stopped when both of them
 *   are fully used up.
 *
 * To make this strategy work, each iobuf in use must fit at
 * least one request. Otherwise, iobuf[1] may end
 * up having no data to flush, while iobuf[0] is too small to
 * fit a big incoming request.
 */
static struct iobuf *
iproto_connection_input_iobuf(struct iproto_connection *con)
{
	struct iobuf *oldbuf = con->iobuf[0];

	ssize_t to_read = 3; /* Smallest possible valid request. */

	/* The type code is checked in iproto_enqueue_batch() */
	if (con->parse_size) {
		const char *pos = oldbuf->in.wpos - con->parse_size;
		if (mp_check_uint(pos, oldbuf->in.wpos) <= 0)
			to_read = mp_decode_uint(&pos);
	}

	if (ibuf_unused(&oldbuf->in) >= to_read)
		return oldbuf;

	/** All requests are processed, reuse the buffer. */
	if (ibuf_used(&oldbuf->in) == con->parse_size) {
		ibuf_reserve(&oldbuf->in, to_read);
		return oldbuf;
	}

	if (! iobuf_is_idle(con->iobuf[1])) {
		/*
		 * Wait until the second buffer is flushed
		 * and becomes available for reuse.
		 */
		return NULL;
	}
	struct iobuf *newbuf = con->iobuf[1];

	ibuf_reserve(&newbuf->in, to_read + con->parse_size);
	/*
	 * Discard unparsed data in the old buffer, otherwise it
	 * won't be recycled when all parsed requests are processed.
	 */
	oldbuf->in.wpos -= con->parse_size;
	/* Move the cached request prefix to the new buffer. */
	memcpy(newbuf->in.rpos, oldbuf->in.wpos, con->parse_size);
	newbuf->in.wpos += con->parse_size;
	/*
	 * Rotate buffers. Not strictly necessary, but
	 * helps preserve response order.
	 */
	con->iobuf[1] = oldbuf;
	con->iobuf[0] = newbuf;
	return newbuf;
}

/** Enqueue all requests which were read up. */
static inline void
iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
{
	bool stop_input = false;
	while (true) {
		const char *reqstart = in->wpos - con->parse_size;
		const char *pos = reqstart;
		/* Read request length. */
		if (mp_typeof(*pos) != MP_UINT) {
			tnt_raise(ClientError, ER_INVALID_MSGPACK,
				  "packet length");
		}
		if (mp_check_uint(pos, in->wpos) >= 0)
			break;
		uint32_t len = mp_decode_uint(&pos);
		const char *reqend = pos + len;
		if (reqend > in->wpos)
			break;
		struct iproto_msg *msg = iproto_msg_new(con, request_route);
		msg->iobuf = con->iobuf[0];
		IprotoMsgGuard guard(msg);

		xrow_header_decode(&msg->header, &pos, reqend);
		assert(pos == reqend);
		msg->len = reqend - reqstart; /* total request length */
		/*
		 * sic: in case of exception con->parse_size
		 * must not be advanced to stay in sync with
		 * in->rpos.
		 */
		if (msg->header.type >= IPROTO_SELECT &&
		    msg->header.type <= IPROTO_UPSERT) {
			/* Pre-parse request before putting it into the queue */
			if (msg->header.bodycnt == 0) {
				tnt_raise(ClientError, ER_INVALID_MSGPACK,
					  "request type");
			}
			request_create(&msg->request, msg->header.type);
			pos = (const char *) msg->header.body[0].iov_base;
			request_decode(&msg->request, pos,
				       msg->header.body[0].iov_len);
		} else if (msg->header.type == IPROTO_SUBSCRIBE ||
			   msg->header.type == IPROTO_JOIN) {
			/**
			 * Don't mess with the file descriptor
			 * while join is running.
			 */
			ev_io_stop(con->loop, &con->output);
			ev_io_stop(con->loop, &con->input);
			stop_input = true;
		}
		msg->request.header = &msg->header;
		cpipe_push_input(&tx_pipe, guard.release());

		/* Request is parsed */
		con->parse_size -= reqend - reqstart;
		if (con->parse_size == 0 || stop_input)
			break;
	}
	cpipe_flush_input(&tx_pipe);
	/*
	 * Keep reading input, as long as the socket
	 * supplies data.
	 */
	if (!stop_input && !ev_is_active(&con->input))
		ev_feed_event(con->loop, &con->input, EV_READ);
}

static void
iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
			   int /* revents */)
{
	struct iproto_connection *con =
		(struct iproto_connection *) watcher->data;
	int fd = con->input.fd;
	assert(fd >= 0);

	try {
		/* Ensure we have sufficient space for the next round.  */
		struct iobuf *iobuf = iproto_connection_input_iobuf(con);
		if (iobuf == NULL) {
			ev_io_stop(loop, &con->input);
			return;
		}

		struct ibuf *in = &iobuf->in;
		/* Read input. */
		int nrd = sio_read(fd, in->wpos, ibuf_unused(in));
		if (nrd < 0) {                  /* Socket is not ready. */
			ev_io_start(loop, &con->input);
			return;
		}
		if (nrd == 0) {                 /* EOF */
			iproto_connection_close(con);
			return;
		}
		/* Update the read position and connection state. */
		in->wpos += nrd;
		con->parse_size += nrd;
		/* Enqueue all requests which are fully read up. */
		iproto_enqueue_batch(con, in);
	} catch (Exception *e) {
		e->log();
		iproto_connection_close(con);
	}
}

/** Get the iobuf which is currently being flushed. */
static inline struct iobuf *
iproto_connection_output_iobuf(struct iproto_connection *con)
{
	if (obuf_used(&con->iobuf[1]->out) > 0)
		return con->iobuf[1];
	/*
	 * Don't try to write from a newer buffer if an older one
	 * exists: in case of a partial write of a newer buffer,
	 * the client may end up getting a salad of different
	 * pieces of replies from both buffers.
	 */
	if (ibuf_used(&con->iobuf[1]->in) == 0 &&
	    obuf_used(&con->iobuf[0]->out) > 0)
		return con->iobuf[0];
	return NULL;
}

/** writev() to the socket and handle the result. */

static int
iproto_flush(struct iobuf *iobuf, struct iproto_connection *con)
{
	int fd = con->output.fd;
	struct obuf_svp *begin = &iobuf->out.wpos;
	struct obuf_svp *end = &iobuf->out.wend;
	assert(begin->used < end->used);
	struct iovec iov[SMALL_OBUF_IOV_MAX+1];
	struct iovec *src = iobuf->out.iov;
	int iovcnt = end->pos - begin->pos + 1;
	/*
	 * iov[i].iov_len may be concurrently modified in tx thread,
	 * but only for the last position.
	 */
	memcpy(iov, src + begin->pos, iovcnt * sizeof(struct iovec));
	sio_add_to_iov(iov, -begin->iov_len);
	/* *Overwrite* iov_len of the last pos as it may be garbage. */
	iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1);

	ssize_t nwr = sio_writev(fd, iov, iovcnt);
	if (nwr > 0) {
		if (begin->used + nwr == end->used) {
			if (ibuf_used(&iobuf->in) == 0) {
				/* Quickly recycle the buffer if it's idle. */
				assert(end->used == obuf_size(&iobuf->out));
				/* resets wpos and wpend to zero pos */
				iobuf_reset(iobuf);
			} else { /* Avoid assignment reordering. */
				/* Advance write position. */
				*begin = *end;
			}
			return 0;
		}
		size_t offset = 0;
		int advance = 0;
		advance = sio_move_iov(iov, nwr, &offset);
		begin->used += nwr;             /* advance write position */
		begin->iov_len = advance == 0 ? begin->iov_len + offset: offset;
		begin->pos += advance;
		assert(begin->pos <= end->pos);
	}
	return -1;
}

static void
iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
			    int /* revents */)
{
	struct iproto_connection *con = (struct iproto_connection *) watcher->data;

	try {
		struct iobuf *iobuf;
		while ((iobuf = iproto_connection_output_iobuf(con))) {
			if (iproto_flush(iobuf, con) < 0) {
				ev_io_start(loop, &con->output);
				return;
			}
			if (! ev_is_active(&con->input))
				ev_feed_event(loop, &con->input, EV_READ);
		}
		if (ev_is_active(&con->output))
			ev_io_stop(con->loop, &con->output);
	} catch (Exception *e) {
		e->log();
		iproto_connection_close(con);
	}
}

static void
tx_process_msg(struct cmsg *m)
{
	struct iproto_msg *msg = (struct iproto_msg *) m;
	struct obuf *out = &msg->iobuf->out;
	struct iproto_connection *con = msg->connection;
	struct session *session = msg->connection->session;
	fiber_set_session(fiber(), session);
	fiber_set_user(fiber(), &session->credentials);

	session->sync = msg->header.sync;
	try {
		switch (msg->header.type) {
		case IPROTO_SELECT:
		{
			struct iproto_port port;
			iproto_port_init(&port, out, msg->header.sync);
			struct request *req = &msg->request;
			int rc = box_select((struct port *) &port,
					    req->space_id, req->index_id,
					    req->iterator,
					    req->offset, req->limit,
					    req->key, req->key_end);
			if (rc < 0) {
				/*
				 * This only works if there are no
				 * yields between the moment the
				 * port is first used for
				 * output and is flushed/an error
				 * occurs.
				 */
				if (port.found)
					obuf_rollback_to_svp(out, &port.svp);
				throw (Exception *) box_error_last();
			}
			break;
		}
		case IPROTO_INSERT:
		case IPROTO_REPLACE:
		case IPROTO_UPDATE:
		case IPROTO_DELETE:
		case IPROTO_UPSERT:
		{
			assert(msg->request.type == msg->header.type);
			struct tuple *tuple;
			if (box_process1(&msg->request, &tuple) < 0)
				throw (Exception *) box_error_last();
			struct obuf_svp svp = iproto_prepare_select(out);
			if (tuple)
				tuple_to_obuf(tuple, out);
			iproto_reply_select(out, &svp, msg->header.sync,
					    tuple != 0);
			break;
		}
		case IPROTO_CALL:
			assert(msg->request.type == msg->header.type);
			rmean_collect(rmean_box, msg->request.type, 1);
			box_lua_call(&msg->request, out);
			break;
		case IPROTO_EVAL:
			assert(msg->request.type == msg->header.type);
			rmean_collect(rmean_box, msg->request.type, 1);
			box_lua_eval(&msg->request, out);
			break;
		case IPROTO_AUTH:
		{
			assert(msg->request.type == msg->header.type);
			const char *user = msg->request.key;
			uint32_t len = mp_decode_strl(&user);
			authenticate(user, len, msg->request.tuple,
				     msg->request.tuple_end);
			iproto_reply_ok(out, msg->header.sync);
			break;
		}
		case IPROTO_PING:
			iproto_reply_ok(out, msg->header.sync);
			break;
		case IPROTO_JOIN:
			/*
			 * As soon as box_process_subscribe() returns the
			 * lambda in the beginning of the block
			 * will re-activate the watchers for us.
			 */
			box_process_join(con->input.fd, &msg->header);
			break;
		case IPROTO_SUBSCRIBE:
			/*
			 * Subscribe never returns - unless there
			 * is an error/exception. In that case
			 * the write watcher will be re-activated
			 * the same way as for JOIN.
			 */
			box_process_subscribe(con->input.fd, &msg->header);
			break;
		default:
			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
				   (uint32_t) msg->header.type);
		}
	} catch (Exception *e) {
		iproto_reply_error(out, e, msg->header.sync);
	}
	msg->write_end = obuf_create_svp(out);
}

static void
net_send_msg(struct cmsg *m)
{
	struct iproto_msg *msg = (struct iproto_msg *) m;
	struct iproto_connection *con = msg->connection;
	struct iobuf *iobuf = msg->iobuf;
	/* Discard request (see iproto_enqueue_batch()) */
	iobuf->in.rpos += msg->len;
	iobuf->out.wend = msg->write_end;
	if ((msg->header.type == IPROTO_SUBSCRIBE ||
	    msg->header.type == IPROTO_JOIN)) {
		assert(! ev_is_active(&con->input));
		ev_io_start(con->loop, &con->input);
	}

	if (evio_has_fd(&con->output)) {
		if (! ev_is_active(&con->output))
			ev_feed_event(con->loop, &con->output, EV_WRITE);
	} else if (iproto_connection_is_idle(con)) {
		iproto_connection_close(con);
	}
	iproto_msg_delete(msg);
}

const char *
iproto_greeting(const char *salt)
{
	static __thread char greeting[IPROTO_GREETING_SIZE + 1];
	char base64buf[SESSION_SEED_SIZE * 4 / 3 + 5];

	base64_encode(salt, SESSION_SEED_SIZE, base64buf, sizeof(base64buf));
	snprintf(greeting, sizeof(greeting),
		 "Tarantool %-20s %-32s\n%-63s\n",
		 tarantool_version(), custom_proc_title, base64buf);
	return greeting;
}

/**
 * Handshake a connection: invoke the on-connect trigger
 * and possibly authenticate. Try to send the client an error
 * upon a failure.
 */
static void
tx_process_connect(struct cmsg *m)
{
	struct iproto_msg *msg = (struct iproto_msg *) m;
	struct iproto_connection *con = msg->connection;
	struct obuf *out = &msg->iobuf->out;
	try {              /* connect. */
		con->session = session_create(con->input.fd, con->cookie);
		obuf_dup(out, iproto_greeting(con->session->salt),
			 IPROTO_GREETING_SIZE);
		if (! rlist_empty(&session_on_connect))
			session_run_on_connect_triggers(con->session);
		msg->write_end = obuf_create_svp(out);
	} catch (Exception *e) {
		iproto_reply_error(out, e, 0 /* zero sync for connect error */);
		msg->close_connection = true;
	}
}

/**
 * Send a response to connect to the client or close the
 * connection in case on_connect trigger failed.
 */
static void
net_send_greeting(struct cmsg *m)
{
	struct iproto_msg *msg = (struct iproto_msg *) m;
	struct iproto_connection *con = msg->connection;
	if (msg->close_connection) {
		struct obuf *out = &msg->iobuf->out;
		try {
			sio_writev(con->output.fd, out->iov,
				   obuf_iovcnt(out));
		} catch (Exception *e) {
			e->log();
		}
		assert(iproto_connection_is_idle(con));
		iproto_connection_close(con);
		iproto_msg_delete(msg);
		return;
	}
	con->iobuf[0]->out.wend = msg->write_end;
	/*
	 * Connect is synchronous, so no one could have been
	 * messing up with the connection while it was in
	 * progress.
	 */
	assert(evio_has_fd(&con->output));
	/* Handshake OK, start reading input. */
	ev_feed_event(con->loop, &con->output, EV_WRITE);
	iproto_msg_delete(msg);
}

static struct cmsg_hop connect_route[] = {
	{ tx_process_connect, &net_pipe },
	{ net_send_greeting, NULL },
};

/** }}} */

/**
 * Create a connection and start input.
 */
static void
iproto_on_accept(struct evio_service * /* service */, int fd,
		 struct sockaddr *addr, socklen_t addrlen)
{
	char name[SERVICE_NAME_MAXLEN];
	snprintf(name, sizeof(name), "%s/%s", "iobuf",
		sio_strfaddr(addr, addrlen));

	struct iproto_connection *con;

	con = iproto_connection_new(name, fd, addr);
	/*
	 * Ignore msg allocation failure - the queue size is
	 * fixed so there is a limited number of msgs in
	 * use, all stored in just a few blocks of the memory pool.
	 */
	struct iproto_msg *msg = iproto_msg_new(con, connect_route);
	msg->iobuf = con->iobuf[0];
	msg->close_connection = false;
	cpipe_push(&tx_pipe, msg);
}

static struct evio_service binary; /* iproto binary listener */

/**
 * The network io thread main function:
 * begin serving the message bus.
 */
static void
net_cord_f(va_list /* ap */)
{
	/* Got to be called in every thread using iobuf */
	iobuf_init();
	mempool_create(&iproto_msg_pool, &cord()->slabc,
		       sizeof(struct iproto_msg));
	cpipe_create(&net_pipe);
	mempool_create(&iproto_connection_pool, &cord()->slabc,
		       sizeof(struct iproto_connection));

	evio_service_init(loop(), &binary, "binary",
			  iproto_on_accept, NULL);

	cbus_join(&net_tx_bus, &net_pipe);
	/*
	 * Nothing to do in the fiber so far, the service
	 * will take care of creating events for incoming
	 * connections.
	 */
	fiber_yield();
}

/** Initialize the iproto subsystem and start network io thread */
void
iproto_init()
{
	tx_cord = cord();

	cbus_create(&net_tx_bus);
	cpipe_create(&tx_pipe);
	static struct cpipe_fiber_pool fiber_pool;

	cpipe_fiber_pool_create(&fiber_pool, "iproto", &tx_pipe,
				IPROTO_FIBER_POOL_SIZE,
				IPROTO_FIBER_POOL_IDLE_TIMEOUT);

	static struct cord net_cord;
	if (cord_costart(&net_cord, "iproto", net_cord_f, NULL))
		panic("failed to initialize iproto thread");

	cbus_join(&net_tx_bus, &tx_pipe);
}

/**
 * Since there is no way to "synchronously" change the
 * state of the io thread, to change the listen port
 * we need to bounce a couple of messages to and
 * from this thread.
 */
struct iproto_set_listen_msg: public cmsg
{
	/**
	 * If there was an error setting the listen port,
	 * this will contain the error when the message
	 * returns to the caller.
	 */
	struct diag diag;
	/**
	 * The uri to set.
	 */
	const char *uri;
	/**
	 * The way to tell the caller about the end of
	 * bind.
	 */
	struct cmsg_notify wakeup;
};

/**
 * The bind has finished, notify the caller.
 */
static void
iproto_on_bind(void *arg)
{
	cpipe_push(&tx_pipe, (struct cmsg_notify *) arg);
}

static void
iproto_do_set_listen(struct cmsg *m)
{
	struct iproto_set_listen_msg *msg =
		(struct iproto_set_listen_msg *) m;
	try {
		if (evio_service_is_active(&binary))
			evio_service_stop(&binary);

		if (msg->uri != NULL) {
			binary.on_bind = iproto_on_bind;
			binary.on_bind_param = &msg->wakeup;
			evio_service_start(&binary, msg->uri);
		} else {
			iproto_on_bind(&msg->wakeup);
		}
	} catch (Exception *e) {
		diag_move(&fiber()->diag, &msg->diag);
	}
}

static void
iproto_set_listen_msg_init(struct iproto_set_listen_msg *msg,
			    const char *uri)
{
	static cmsg_hop route[] = { { iproto_do_set_listen, NULL }, };
	cmsg_init(msg, route);
	msg->uri = uri;
	diag_create(&msg->diag);

	cmsg_notify_init(&msg->wakeup);
}

void
iproto_set_listen(const char *uri)
{
	/**
	 * This is a tricky orchestration for something
	 * that should be pretty easy at the first glance:
	 * change the listen uri in the io thread.
	 *
	 * To do it, create a message which sets the new
	 * uri, and another one, which will alert tx
	 * thread when bind() on the new port is done.
	 */
	static struct iproto_set_listen_msg msg;
	iproto_set_listen_msg_init(&msg, uri);

	cpipe_push(&net_pipe, &msg);
	/** Wait for the end of bind. */
	fiber_yield();
	if (! diag_is_empty(&msg.diag)) {
		diag_move(&msg.diag, &fiber()->diag);
		diag_last_error(&fiber()->diag)->raise();
	}
}

/* vim: set foldmethod=marker */