From 54415be9abff9088febbe91b861008f40aa25f2c Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Wed, 25 Dec 2013 18:26:02 +0400 Subject: [PATCH] [gh-162 authentication] Create a structure for session Create a data structure representing a (potentially authenticated) session. This structure can hold authentication token. --- src/admin.cc | 2 +- src/fiber.cc | 3 +- src/fiber.h | 19 +-- src/iproto.cc | 353 ++++++++++++++++++++------------------------- src/lua/fiber.cc | 4 +- src/lua/session.cc | 20 ++- src/session.cc | 41 ++++-- src/session.h | 10 +- 8 files changed, 227 insertions(+), 225 deletions(-) diff --git a/src/admin.cc b/src/admin.cc index 9f21ece810..00e9315d61 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -79,7 +79,7 @@ admin_handler(va_list ap) auto scoped_guard = make_scoped_guard([&] { evio_close(&coio); iobuf_delete(iobuf); - session_destroy(fiber->sid); + session_destroy(fiber->session); }); /* diff --git a/src/fiber.cc b/src/fiber.cc index aaf78fc46b..432a116b4b 100644 --- a/src/fiber.cc +++ b/src/fiber.cc @@ -358,6 +358,7 @@ fiber_zombificate() fiber_wakeup(fiber->waiter); rlist_del(&fiber->state); fiber->waiter = NULL; + fiber->session = NULL; fiber_set_name(fiber, "zombie"); fiber->f = NULL; unregister_fid(fiber); @@ -445,7 +446,7 @@ fiber_new(const char *name, void (*f) (va_list)) if (++last_used_fid < 100) last_used_fid = 100; fiber->fid = last_used_fid; - fiber->sid = 0; + fiber->session = NULL; fiber->flags = 0; fiber->waiter = NULL; fiber_set_name(fiber, name); diff --git a/src/fiber.h b/src/fiber.h index 6a1d497e57..cca235035b 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -84,15 +84,15 @@ struct fiber { /** Fiber id. */ uint32_t fid; /** - * Session id of the session the fiber is running + * The logical user session the fiber is running * on behalf of. The concept of an associated session * is similar to the concept of controlling tty * in a UNIX process. When a fiber is created, - * its sid is 0. If it's running a request on behalf - * of a user connection, it's sid is changed to module- - * generated identifier of the session. + * it has no session. If it's running a request on behalf + * of a user connection, it's session is changed + * to represent this connection. */ - uint32_t sid; + struct session *session; struct rlist link; struct rlist state; @@ -163,14 +163,11 @@ void fiber_sleep(ev_tstamp s); struct tbuf; void fiber_schedule(ev_watcher *watcher, int event __attribute__((unused))); -/** - * Attach this fiber to a session identified by sid and to a cookie. - */ +/** Set or clear this fiber's session. */ static inline void -fiber_set_sid(struct fiber *f, uint32_t sid, uint64_t cookie) +fiber_set_session(struct fiber *f, struct session *session) { - f->sid = sid; - f->cookie = cookie; + f->session = session; } typedef int (*fiber_stat_cb)(struct fiber *f, void *ctx); diff --git a/src/iproto.cc b/src/iproto.cc index 0563b9e7f5..5326d94aa2 100644 --- a/src/iproto.cc +++ b/src/iproto.cc @@ -53,11 +53,11 @@ struct iproto_request; /** * Implementation of an input queue of the box request processor. * - * Socket event handlers read data, determine request boundaries + * Event handlers read data, determine request boundaries * and enqueue requests. Once all input/output events are - * processed, an own event handler is invoked to deal with the - * requests in the queue: it's important that each request is - * processed in a fiber environment. + * processed, an own handler is invoked to deal with the + * requests in the queue. It leases a fiber from a pool + * and runs the request in the fiber. * * @sa iproto_queue_schedule, iproto_handler, iproto_handshake */ @@ -94,7 +94,7 @@ enum { IPROTO_REQUEST_QUEUE_SIZE = 2048, }; -struct iproto_session; +struct iproto_connection; typedef void (*iproto_request_f)(struct iproto_request *); @@ -105,7 +105,7 @@ typedef void (*iproto_request_f)(struct iproto_request *); */ struct iproto_request { - struct iproto_session *session; + struct iproto_connection *connection; struct iobuf *iobuf; /* Position of the request in the input buffer. */ struct iproto_header *header; @@ -134,7 +134,7 @@ iproto_queue_is_empty(struct iproto_queue *i_queue) static inline void iproto_enqueue_request(struct iproto_queue *i_queue, - struct iproto_session *session, + struct iproto_connection *con, struct iobuf *iobuf, struct iproto_header *header, iproto_request_f process) @@ -146,7 +146,7 @@ iproto_enqueue_request(struct iproto_queue *i_queue, bool was_empty = iproto_queue_is_empty(i_queue); struct iproto_request *request = i_queue->queue + i_queue->end++; - request->session = session; + request->connection = con; request->iobuf = iobuf; request->header = header; request->process = process; @@ -216,37 +216,13 @@ iproto_queue_init(struct iproto_queue *i_queue, rlist_create(&i_queue->fiber_cache); } -static inline uint32_t -iproto_session_id(struct iproto_session *session); - -static inline uint64_t -iproto_session_cookie(struct iproto_session *session); - -/** A handler to process all queued requests. */ -static void -iproto_queue_handler(va_list ap) -{ - struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *); - struct iproto_request request; -restart: - while (iproto_dequeue_request(i_queue, &request)) { - - fiber_set_sid(fiber, iproto_session_id(request.session), iproto_session_cookie(request.session)); - request.process(&request); - } - iproto_cache_fiber(&request_queue); - goto restart; -} - /* }}} */ -/* {{{ iproto_session */ +/* {{{ iproto_connection */ /** Context of a single client connection. */ -struct iproto_session +struct iproto_connection { - /* Cache of iproto_session objects. */ - SLIST_ENTRY(iproto_session) next_in_cache; /** * Two rotating buffers for I/O. Input is always read into * iobuf[0]. As soon as iobuf[0] input buffer becomes full, @@ -276,51 +252,36 @@ struct iproto_session box_process_func *handler; struct ev_io input; struct ev_io output; - /** Session id. */ - uint32_t sid; + /** Logical session. */ + struct session *session; uint64_t cookie; }; -SLIST_HEAD(, iproto_session) iproto_session_cache = - SLIST_HEAD_INITIALIZER(iproto_session_cache); - -static struct mempool iproto_session_pool; +static struct mempool iproto_connection_pool; /** - * A session is idle when the client is gone + * A connection is idle when the client is gone * and there are no outstanding requests in the request queue. - * An idle session can be safely garbage collected. - * Note: a session only becomes idle after iproto_session_shutdown(), + * An idle connection can be safely garbage collected. + * Note: a connection only becomes idle after iproto_connection_shutdown(), * which closes the fd. This is why here the check is for * evio_is_active() (false if fd is closed), not ev_is_active() * (false if event is not started). */ static inline bool -iproto_session_is_idle(struct iproto_session *session) -{ - return !evio_is_active(&session->input) && - ibuf_size(&session->iobuf[0]->in) == 0 && - ibuf_size(&session->iobuf[1]->in) == 0; -} - -static inline uint32_t -iproto_session_id(struct iproto_session *session) +iproto_connection_is_idle(struct iproto_connection *con) { - return session->sid; -} - -static inline uint64_t -iproto_session_cookie(struct iproto_session *session) -{ - return session->cookie; + return !evio_is_active(&con->input) && + ibuf_size(&con->iobuf[0]->in) == 0 && + ibuf_size(&con->iobuf[1]->in) == 0; } static void -iproto_session_on_input(struct ev_io *watcher, - int revents __attribute__((unused))); +iproto_connection_on_input(struct ev_io *watcher, + int revents __attribute__((unused))); static void -iproto_session_on_output(struct ev_io *watcher, - int revents __attribute__((unused))); +iproto_connection_on_output(struct ev_io *watcher, + int revents __attribute__((unused))); static void iproto_process_request(struct iproto_request *request); @@ -331,68 +292,60 @@ iproto_process_connect(struct iproto_request *request); static void iproto_process_disconnect(struct iproto_request *request); -static struct iproto_session * -iproto_session_create(const char *name, int fd, struct sockaddr_in *addr, +static struct iproto_connection * +iproto_connection_create(const char *name, int fd, struct sockaddr_in *addr, box_process_func *param) { - struct iproto_session *session; - if (SLIST_EMPTY(&iproto_session_cache)) { - session = (struct iproto_session *) - mempool_alloc(&iproto_session_pool); - session->input.data = session->output.data = session; - } else { - session = SLIST_FIRST(&iproto_session_cache); - SLIST_REMOVE_HEAD(&iproto_session_cache, next_in_cache); - assert(session->input.fd == -1); - assert(session->output.fd == -1); - } - session->handler = param; - ev_io_init(&session->input, iproto_session_on_input, fd, EV_READ); - ev_io_init(&session->output, iproto_session_on_output, fd, EV_WRITE); - session->iobuf[0] = iobuf_new(name); - session->iobuf[1] = iobuf_new(name); - session->parse_size = 0; - session->write_pos = obuf_create_svp(&session->iobuf[0]->out); - session->sid = 0; - session->cookie = *(uint64_t *) addr; - return session; + struct iproto_connection *con = (struct iproto_connection *) + mempool_alloc(&iproto_connection_pool); + con->input.data = con->output.data = con; + con->handler = param; + ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); + ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); + con->iobuf[0] = iobuf_new(name); + con->iobuf[1] = iobuf_new(name); + con->parse_size = 0; + con->write_pos = obuf_create_svp(&con->iobuf[0]->out); + con->session = NULL; + con->cookie = *(uint64_t *) addr; + return con; } -/** Recycle a session. Never throws. */ +/** Recycle a connection. Never throws. */ static inline void -iproto_session_destroy(struct iproto_session *session) +iproto_connection_destroy(struct iproto_connection *con) { - assert(iproto_session_is_idle(session)); - assert(!evio_is_active(&session->output)); - session_destroy(session->sid); /* Never throws. No-op if sid is 0. */ - iobuf_delete(session->iobuf[0]); - iobuf_delete(session->iobuf[1]); - SLIST_INSERT_HEAD(&iproto_session_cache, session, next_in_cache); + assert(iproto_connection_is_idle(con)); + assert(!evio_is_active(&con->output)); + session_destroy(con->session); /* Never throws. No-op if sid is 0. */ + iobuf_delete(con->iobuf[0]); + iobuf_delete(con->iobuf[1]); + mempool_free(&iproto_connection_pool, con); } static inline void -iproto_session_shutdown(struct iproto_session *session) +iproto_connection_shutdown(struct iproto_connection *con) { - ev_io_stop(&session->input); - ev_io_stop(&session->output); - close(session->input.fd); - session->input.fd = session->output.fd = -1; + ev_io_stop(&con->input); + ev_io_stop(&con->output); + close(con->input.fd); + con->input.fd = con->output.fd = -1; /* - * Discard unparsed data, to recycle the session + * Discard unparsed data, to recycle the con * as soon as all parsed data is processed. */ - session->iobuf[0]->in.end -= session->parse_size; + con->iobuf[0]->in.end -= con->parse_size; /* - * If the session is not idle, it is destroyed + * If the con is not idle, it is destroyed * after the last request is handled. Otherwise, * queue a separate request to run on_disconnect() - * trigger and destroy the session. - * Sic: the check is mandatory to not destroy a session + * trigger and destroy the connection. + * Sic: the check is mandatory to not destroy a connection * twice. */ - if (iproto_session_is_idle(session)) { - iproto_enqueue_request(&request_queue, session, - session->iobuf[0], &dummy_header, + if (iproto_connection_is_idle(con)) { + iproto_enqueue_request(&request_queue, con, + con->iobuf[0], &dummy_header, iproto_process_disconnect); } } @@ -416,7 +369,7 @@ iproto_validate_header(struct iproto_header *header, int fd) * - try to get a new iobuf, so that it can fit the request. * Always getting a new input buffer when there is no space * makes the server susceptible to input-flood attacks. - * Therefore, at most 2 iobufs are used in a single session, + * Therefore, at most 2 iobufs are used in a single connection, * one is "open", receiving input, and the other is closed, * flushing output. * - stop input and wait until the client reads piled up output, @@ -424,7 +377,7 @@ iproto_validate_header(struct iproto_header *header, int fd) * the previous strategy. It is only safe to stop input if it * is known that there is output. In this case input event * flow will be resumed when all replies to previous requests - * are sent, in iproto_session_gc_iobuf(). Since there are two + * are sent, in iproto_connection_gc_iobuf(). Since there are two * buffers, the input is only stopped when both of them * are fully used up. * @@ -434,89 +387,88 @@ iproto_validate_header(struct iproto_header *header, int fd) * fit a big incoming request. */ static struct iobuf * -iproto_session_input_iobuf(struct iproto_session *session) +iproto_connection_input_iobuf(struct iproto_connection *con) { - struct iobuf *oldbuf = session->iobuf[0]; + struct iobuf *oldbuf = con->iobuf[0]; ssize_t to_read = sizeof(struct iproto_header) + - (session->parse_size >= sizeof(struct iproto_header) ? - iproto(oldbuf->in.end - session->parse_size)->len : 0) - - session->parse_size; + (con->parse_size >= sizeof(struct iproto_header) ? + iproto(oldbuf->in.end - con->parse_size)->len : 0) - + con->parse_size; if (ibuf_unused(&oldbuf->in) >= to_read) return oldbuf; /** All requests are processed, reuse the buffer. */ - if (ibuf_size(&oldbuf->in) == session->parse_size) { + if (ibuf_size(&oldbuf->in) == con->parse_size) { ibuf_reserve(&oldbuf->in, to_read); return oldbuf; } - if (! iobuf_is_idle(session->iobuf[1])) { + if (! iobuf_is_idle(con->iobuf[1])) { /* * Wait until the second buffer is flushed * and becomes available for reuse. */ return NULL; } - struct iobuf *newbuf = session->iobuf[1]; + struct iobuf *newbuf = con->iobuf[1]; - ibuf_reserve(&newbuf->in, to_read + session->parse_size); + ibuf_reserve(&newbuf->in, to_read + con->parse_size); /* * Discard unparsed data in the old buffer, otherwise it * won't be recycled when all parsed requests are processed. */ - oldbuf->in.end -= session->parse_size; + oldbuf->in.end -= con->parse_size; /* Move the cached request prefix to the new buffer. */ - memcpy(newbuf->in.pos, oldbuf->in.end, session->parse_size); - newbuf->in.end += session->parse_size; + memcpy(newbuf->in.pos, oldbuf->in.end, con->parse_size); + newbuf->in.end += con->parse_size; /* * Rotate buffers. Not strictly necessary, but * helps preserve response order. */ - session->iobuf[1] = oldbuf; - session->iobuf[0] = newbuf; + con->iobuf[1] = oldbuf; + con->iobuf[0] = newbuf; return newbuf; } /** Enqueue all requests which were read up. */ static inline void -iproto_enqueue_batch(struct iproto_session *session, struct ibuf *in, int fd) +iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in, int fd) { - int batch_size; - for (batch_size = 0; ; batch_size++) { - - if (session->parse_size < sizeof(struct iproto_header)) + while (true) { + if (con->parse_size < sizeof(struct iproto_header)) break; struct iproto_header * - header = iproto(in->end - session->parse_size); + header = iproto(in->end - con->parse_size); iproto_validate_header(header, fd); - if (session->parse_size < (sizeof(struct iproto_header) + + if (con->parse_size < (sizeof(struct iproto_header) + header->len)) break; - iproto_enqueue_request(&request_queue, session, - session->iobuf[0], header, + iproto_enqueue_request(&request_queue, con, + con->iobuf[0], header, iproto_process_request); - session->parse_size -= sizeof(*header) + header->len; + con->parse_size -= sizeof(*header) + header->len; } } static void -iproto_session_on_input(struct ev_io *watcher, - int revents __attribute__((unused))) +iproto_connection_on_input(struct ev_io *watcher, + int revents __attribute__((unused))) { - struct iproto_session *session = (struct iproto_session *) watcher->data; - int fd = session->input.fd; + struct iproto_connection *con = + (struct iproto_connection *) watcher->data; + int fd = con->input.fd; assert(fd >= 0); try { /* Ensure we have sufficient space for the next round. */ - struct iobuf *iobuf = iproto_session_input_iobuf(session); + struct iobuf *iobuf = iproto_connection_input_iobuf(con); if (iobuf == NULL) { - ev_io_stop(&session->input); + ev_io_stop(&con->input); return; } @@ -524,45 +476,45 @@ iproto_session_on_input(struct ev_io *watcher, /* Read input. */ int nrd = sio_read(fd, in->end, ibuf_unused(in)); if (nrd < 0) { /* Socket is not ready. */ - ev_io_start(&session->input); + ev_io_start(&con->input); return; } if (nrd == 0) { /* EOF */ - iproto_session_shutdown(session); + iproto_connection_shutdown(con); return; } - /* Update the read position and session state. */ + /* Update the read position and connection state. */ in->end += nrd; - session->parse_size += nrd; + con->parse_size += nrd; /* Enqueue all requests which are fully read up. */ - iproto_enqueue_batch(session, in, fd); + iproto_enqueue_batch(con, in, fd); /* * Keep reading input, as long as the socket * supplies data. */ - if (!ev_is_active(&session->input)) - ev_feed_event(&session->input, EV_READ); + if (!ev_is_active(&con->input)) + ev_feed_event(&con->input, EV_READ); } catch (const Exception& e) { e.log(); - iproto_session_shutdown(session); + iproto_connection_shutdown(con); } } /** Get the iobuf which is currently being flushed. */ static inline struct iobuf * -iproto_session_output_iobuf(struct iproto_session *session) +iproto_connection_output_iobuf(struct iproto_connection *con) { - if (obuf_size(&session->iobuf[1]->out)) - return session->iobuf[1]; + if (obuf_size(&con->iobuf[1]->out)) + return con->iobuf[1]; /* * Don't try to write from a newer buffer if an older one * exists: in case of a partial write of a newer buffer, * the client may end up getting a salad of different * pieces of replies from both buffers. */ - if (ibuf_size(&session->iobuf[1]->in) == 0 && - obuf_size(&session->iobuf[0]->out)) - return session->iobuf[0]; + if (ibuf_size(&con->iobuf[1]->in) == 0 && + obuf_size(&con->iobuf[0]->out)) + return con->iobuf[0]; return NULL; } @@ -580,7 +532,7 @@ iproto_flush(struct iobuf *iobuf, int fd, struct obuf_svp *svp) nwr = sio_writev(fd, iov, iovcnt); sio_add_to_iov(iov, svp->iov_len); - } catch (const Exception&) { + } catch (const Exception &) { sio_add_to_iov(iov, svp->iov_len); throw; } @@ -598,36 +550,52 @@ iproto_flush(struct iobuf *iobuf, int fd, struct obuf_svp *svp) } static void -iproto_session_on_output(struct ev_io *watcher, +iproto_connection_on_output(struct ev_io *watcher, int revent __attribute__((unused))) { - struct iproto_session *session = (struct iproto_session *) watcher->data; - int fd = session->output.fd; - struct obuf_svp *svp = &session->write_pos; + struct iproto_connection *con = (struct iproto_connection *) watcher->data; + int fd = con->output.fd; + struct obuf_svp *svp = &con->write_pos; try { struct iobuf *iobuf; - while ((iobuf = iproto_session_output_iobuf(session))) { + while ((iobuf = iproto_connection_output_iobuf(con))) { if (iproto_flush(iobuf, fd, svp) < 0) { - ev_io_start(&session->output); + ev_io_start(&con->output); return; } - if (! ev_is_active(&session->input)) - ev_feed_event(&session->input, EV_READ); + if (! ev_is_active(&con->input)) + ev_feed_event(&con->input, EV_READ); } - if (ev_is_active(&session->output)) - ev_io_stop(&session->output); + if (ev_is_active(&con->output)) + ev_io_stop(&con->output); } catch (const Exception& e) { e.log(); - iproto_session_shutdown(session); + iproto_connection_shutdown(con); + } +} + +/** A handler to process all queued requests. */ +static void +iproto_queue_handler(va_list ap) +{ + struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *); + struct iproto_request request; +restart: + while (iproto_dequeue_request(i_queue, &request)) { + + fiber_set_session(fiber, request.connection->session); + request.process(&request); } + iproto_cache_fiber(&request_queue); + goto restart; } /* }}} */ /* {{{ iproto_process_* functions */ -/** Stack reply to 'ping' packet. */ +/** Stack a reply to 'ping' packet. */ static inline void iproto_reply_ping(struct obuf *out, struct iproto_header *req) { @@ -675,27 +643,27 @@ iproto_reply(struct iproto_port *port, box_process_func callback, static void iproto_process_request(struct iproto_request *request) { - struct iproto_session *session = request->session; + struct iproto_connection *con = request->connection; struct iproto_header *header = request->header; struct iobuf *iobuf = request->iobuf; struct iproto_port port; auto scope_guard = make_scoped_guard([=]{ iobuf->in.pos += sizeof(*header) + header->len; - if (iproto_session_is_idle(session)) - iproto_session_destroy(session); + if (iproto_connection_is_idle(con)) + iproto_connection_destroy(con); }); - if (unlikely(! evio_is_active(&session->output))) + if (unlikely(! evio_is_active(&con->output))) return; - iproto_reply(&port, *session->handler, &iobuf->out, header); + iproto_reply(&port, *con->handler, &iobuf->out, header); - if (unlikely(! evio_is_active(&session->output))) + if (unlikely(! evio_is_active(&con->output))) return; - if (! ev_is_active(&session->output)) - ev_feed_event(&session->output, EV_WRITE); + if (! ev_is_active(&con->output)) + ev_feed_event(&con->output, EV_WRITE); } /** @@ -706,48 +674,47 @@ iproto_process_request(struct iproto_request *request) static void iproto_process_connect(struct iproto_request *request) { - struct iproto_session *session = request->session; + struct iproto_connection *con = request->connection; struct iobuf *iobuf = request->iobuf; - int fd = session->input.fd; + int fd = con->input.fd; try { /* connect. */ - session->sid = session_create(fd, session->cookie); + con->session = session_create(fd, con->cookie); } catch (const ClientError& e) { iproto_reply_error(&iobuf->out, request->header, e); try { - iproto_flush(iobuf, fd, &session->write_pos); + iproto_flush(iobuf, fd, &con->write_pos); } catch (const Exception& e) { e.log(); } - iproto_session_shutdown(session); + iproto_connection_shutdown(con); return; } catch (const Exception& e) { e.log(); - assert(session->sid == 0); - iproto_session_shutdown(session); + assert(con->session == NULL); + iproto_connection_shutdown(con); return; } /* * Connect is synchronous, so no one could have been - * messing up with the session while it was in + * messing up with the connection while it was in * progress. */ - assert(evio_is_active(&session->input)); + assert(evio_is_active(&con->input)); /* Handshake OK, start reading input. */ - ev_feed_event(&session->input, EV_READ); + ev_feed_event(&con->input, EV_READ); } static void iproto_process_disconnect(struct iproto_request *request) { - fiber_set_sid(fiber, request->session->sid, request->session->cookie); /* Runs the trigger, which may yield. */ - iproto_session_destroy(request->session); + iproto_connection_destroy(request->connection); } /** }}} */ /** - * Create a session context and start input. + * Create a connection context and start input. */ static void iproto_on_accept(struct evio_service *service, int fd, @@ -756,13 +723,13 @@ iproto_on_accept(struct evio_service *service, int fd, char name[SERVICE_NAME_MAXLEN]; snprintf(name, sizeof(name), "%s/%s", "iobuf", sio_strfaddr(addr)); - struct iproto_session *session; + struct iproto_connection *con; box_process_func *process_fun = (box_process_func*) service->on_accept_param; - session = iproto_session_create(name, fd, addr, process_fun); - iproto_enqueue_request(&request_queue, session, - session->iobuf[0], &dummy_header, + con = iproto_connection_create(name, fd, addr, process_fun); + iproto_enqueue_request(&request_queue, con, + con->iobuf[0], &dummy_header, iproto_process_connect); } @@ -795,7 +762,7 @@ iproto_init(const char *bind_ipaddr, int primary_port, } iproto_queue_init(&request_queue, IPROTO_REQUEST_QUEUE_SIZE, iproto_queue_handler); - mempool_create(&iproto_session_pool, slabc_runtime, - sizeof(struct iproto_session)); + mempool_create(&iproto_connection_pool, slabc_runtime, + sizeof(struct iproto_connection)); } diff --git a/src/lua/fiber.cc b/src/lua/fiber.cc index a5c90887ea..b7814cd453 100644 --- a/src/lua/fiber.cc +++ b/src/lua/fiber.cc @@ -377,7 +377,7 @@ lbox_fiber_detach(struct lua_State *L) /* Request a detach. */ lua_pushinteger(L, DETACH); /* A detached fiber has no associated session. */ - fiber_set_sid(fiber, 0, 0); + fiber_set_session(fiber, NULL); fiber_yield_to(caller); return 0; } @@ -471,7 +471,7 @@ lbox_fiber_create(struct lua_State *L) struct fiber *f = fiber_new("lua", box_lua_fiber_run); /* Preserve the session in a child fiber. */ - fiber_set_sid(f, fiber->sid, fiber->cookie); + fiber_set_session(f, fiber->session); /* Initially the fiber is cancellable */ f->flags |= FIBER_USER_MODE | FIBER_CANCELLABLE; diff --git a/src/lua/session.cc b/src/lua/session.cc index 82ed27b522..054f8b0ddd 100644 --- a/src/lua/session.cc +++ b/src/lua/session.cc @@ -53,7 +53,7 @@ static const char *sessionlib_name = "box.session"; static int lbox_session_id(struct lua_State *L) { - lua_pushnumber(L, fiber->sid); + lua_pushnumber(L, fiber->session ? fiber->session->id : 0); return 1; } @@ -71,6 +71,21 @@ lbox_session_exists(struct lua_State *L) return 1; } +/** + * Check whether or not a session exists. + */ +static int +lbox_session_fd(struct lua_State *L) +{ + if (lua_gettop(L) != 1) + luaL_error(L, "session.fd(sid): bad arguments"); + + uint32_t sid = luaL_checkint(L, -1); + lua_pushnumber(L, session_fd(sid)); + return 1; +} + + /** * Pretty print peer name. */ @@ -81,7 +96,7 @@ lbox_session_peer(struct lua_State *L) luaL_error(L, "session.peer(sid): bad arguments"); uint32_t sid = lua_gettop(L) == 1 ? - luaL_checkint(L, -1) : fiber->sid; + luaL_checkint(L, -1) : fiber->session->id; int fd = session_fd(sid); struct sockaddr_in addr; @@ -144,6 +159,7 @@ tarantool_lua_session_init(struct lua_State *L) { static const struct luaL_reg sessionlib[] = { {"id", lbox_session_id}, + {"fd", lbox_session_fd}, {"exists", lbox_session_exists}, {"peer", lbox_session_peer}, {"on_connect", lbox_session_on_connect}, diff --git a/src/session.cc b/src/session.cc index 6973db2313..f1aa22ba67 100644 --- a/src/session.cc +++ b/src/session.cc @@ -28,6 +28,7 @@ */ #include "session.h" #include "fiber.h" +#include "memory.h" #include "assoc.h" #include "trigger.h" @@ -38,24 +39,31 @@ uint32_t sid_max; static struct mh_i32ptr_t *session_registry; +struct mempool session_pool; + RLIST_HEAD(session_on_connect); RLIST_HEAD(session_on_disconnect); -uint32_t +struct session * session_create(int fd, uint64_t cookie) { + struct session *session = (struct session *) + mempool_alloc(&session_pool); /* Return the next sid rolling over the reserved value of 0. */ while (++sid_max == 0) ; - uint32_t sid = sid_max; + session->id = sid_max; + session->fd = fd; + session->cookie = cookie; struct mh_i32ptr_node_t node; - node.key = sid; - node.val = (void *) (intptr_t) fd; + node.key = session->id; + node.val = session; mh_int_t k = mh_i32ptr_put(session_registry, &node, NULL, NULL); if (k == mh_end(session_registry)) { + mempool_free(&session_pool, session); tnt_raise(ClientError, ER_MEMORY_ISSUE, "session hash", "new session"); } @@ -63,22 +71,24 @@ session_create(int fd, uint64_t cookie) * Run the trigger *after* setting the current * fiber sid. */ - fiber_set_sid(fiber, sid, cookie); + fiber_set_session(fiber, session); try { trigger_run(&session_on_connect, NULL); } catch (const Exception& e) { - fiber_set_sid(fiber, 0, 0); + fiber_set_session(fiber, NULL); mh_i32ptr_remove(session_registry, &node, NULL); + mempool_free(&session_pool, session); throw; } - return sid; + return session; } void -session_destroy(uint32_t sid) +session_destroy(struct session *session) { - if (sid == 0) /* no-op for a dead session. */ + if (session == NULL) /* no-op for a dead session. */ return; + fiber_set_session(fiber, session); try { trigger_run(&session_on_disconnect, NULL); @@ -87,17 +97,21 @@ session_destroy(uint32_t sid) } catch (...) { /* catch all. */ } - session_storage_cleanup(sid); - struct mh_i32ptr_node_t node = { sid, NULL }; + session_storage_cleanup(session->id); + struct mh_i32ptr_node_t node = { session->id, NULL }; mh_i32ptr_remove(session_registry, &node, NULL); + mempool_free(&session_pool, session); } int session_fd(uint32_t sid) { mh_int_t k = mh_i32ptr_find(session_registry, sid, NULL); - return k == mh_end(session_registry) ? - -1 : (intptr_t) mh_i32ptr_node(session_registry, k)->val; + if (k == mh_end(session_registry)) + return -1; + struct session *session = (struct session *) + mh_i32ptr_node(session_registry, k)->val; + return session->fd; } void @@ -106,6 +120,7 @@ session_init() session_registry = mh_i32ptr_new(); if (session_registry == NULL) panic("out of memory"); + mempool_create(&session_pool, slabc_runtime, sizeof(struct session)); } void diff --git a/src/session.h b/src/session.h index 3695eb0492..1ef46bd588 100644 --- a/src/session.h +++ b/src/session.h @@ -39,6 +39,12 @@ * 0 sid is reserved to mean 'no session'. */ +struct session { + uint32_t id; + int fd; + uint64_t cookie; +}; + /** * Create a session. * Invokes a Lua trigger box.session.on_connect if it is @@ -50,7 +56,7 @@ * @exception tnt_Exception or lua error if session * trigger fails or runs out of resources. */ -uint32_t +struct session * session_create(int fd, uint64_t cookie); /** @@ -63,7 +69,7 @@ session_create(int fd, uint64_t cookie); * @exception none */ void -session_destroy(uint32_t sid); +session_destroy(struct session *); /** -- GitLab