diff --git a/src/iproto.cc b/src/iproto.cc index c10b0f5d454dc699cbd18c6fdc9b705b662e4c17..22b073536bebcfbec0b916354df3477240f97daf 100644 --- a/src/iproto.cc +++ b/src/iproto.cc @@ -42,25 +42,7 @@ #include "session.h" #include "scoped_guard.h" #include "memory.h" - -/* - * struct iproto_header and struct iproto_reply_header - * share common prefix {msg_code, len, sync} - */ - -struct iproto_header { - uint32_t msg_code; - uint32_t len; - uint32_t sync; -} __attribute__((packed)); - -static inline struct iproto_header * -iproto(const char *pos) -{ - return (struct iproto_header *) pos; -} - -static struct iproto_header dummy_header = { 0, 0, 0 }; +#include "msgpuck/msgpuck.h" /* {{{ iproto_queue */ @@ -123,9 +105,15 @@ struct iproto_request { struct iproto_connection *connection; struct iobuf *iobuf; - /* Position of the request in the input buffer. */ - struct iproto_header *header; iproto_request_f process; + /* Request message code. */ + uint32_t code; + /* Request sync. */ + uint32_t sync; + /* Position of request body in the input buffer. */ + const char *body; + /* Length of the body */ + uint32_t len; }; /** @@ -149,29 +137,36 @@ iproto_queue_is_empty(struct iproto_queue *i_queue) } static inline void -iproto_enqueue_request(struct iproto_queue *i_queue, - struct iproto_connection *con, - struct iobuf *iobuf, - struct iproto_header *header, - iproto_request_f process) +iproto_request_init(struct iproto_request *ireq, + struct iproto_connection *con, + struct iobuf *iobuf, + iproto_request_f process, + uint32_t code, uint32_t sync, + const char *body, uint32_t len) +{ + ireq->connection = con; + ireq->iobuf = iobuf; + ireq->process = process; + ireq->code = code; + ireq->sync = sync; + ireq->body = body; + ireq->len = len; +} + +static inline struct iproto_request * +iproto_enqueue_request(struct iproto_queue *i_queue) { /* If the queue is full, invoke the handler to work it off. */ if (i_queue->end == i_queue->size) ev_invoke(&i_queue->watcher, EV_CUSTOM); assert(i_queue->end < i_queue->size); - bool was_empty = iproto_queue_is_empty(i_queue); - struct iproto_request *request = i_queue->queue + i_queue->end++; - - request->connection = con; - request->iobuf = iobuf; - request->header = header; - request->process = process; /* * There were some queued requests, ensure they are * handled. */ - if (was_empty) + if (iproto_queue_is_empty(i_queue)) ev_feed_event(&request_queue.watcher, EV_CUSTOM); + return i_queue->queue + i_queue->end++; } static inline bool @@ -360,22 +355,22 @@ iproto_connection_shutdown(struct iproto_connection *con) * twice. */ if (iproto_connection_is_idle(con)) { - iproto_enqueue_request(&request_queue, con, - con->iobuf[0], &dummy_header, - iproto_process_disconnect); + struct iproto_request *ireq = iproto_enqueue_request(&request_queue); + iproto_request_init(ireq, con, con->iobuf[0], + iproto_process_disconnect, + 0, 0, 0, 0); } } static inline void -iproto_validate_header(struct iproto_header *header, int fd) +iproto_validate_header(uint32_t len) { - (void) fd; - if (header->len > IPROTO_BODY_LEN_MAX) { + if (len > IPROTO_BODY_LEN_MAX) { /* * The package is too big, just close connection for now to * avoid DoS. */ - tnt_raise(IllegalParams, "received package is too big"); + tnt_raise(IllegalParams, "received packet is too big"); } } @@ -407,10 +402,17 @@ iproto_connection_input_iobuf(struct iproto_connection *con) { struct iobuf *oldbuf = con->iobuf[0]; - ssize_t to_read = sizeof(struct iproto_header) + - (con->parse_size >= sizeof(struct iproto_header) ? - iproto(oldbuf->in.end - con->parse_size)->len : 0) - - con->parse_size; + ssize_t to_read = 3; /* Smallest possible valid request. */ + + if (con->parse_size > 0) { + const char *pos = oldbuf->in.end - con->parse_size; + unsigned char c = (unsigned char) *pos; + /* Checked in iproto_enqueue_batch() */ + assert(mp_type_hint[c] == MP_UINT); + + if (mp_parser_hint[c] <= con->parse_size) + to_read = mp_decode_uint(&pos); + } if (ibuf_unused(&oldbuf->in) >= to_read) return oldbuf; @@ -448,26 +450,77 @@ iproto_connection_input_iobuf(struct iproto_connection *con) return newbuf; } -/** Enqueue all requests which were read up. */ static inline void -iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in, int fd) +mp_decode_imap(const char **pos, const char *end, uint32_t *keys) { - while (true) { - if (con->parse_size < sizeof(struct iproto_header)) - break; + assert(*pos < end); + unsigned char c = (unsigned char) **pos; + /* Only a small map can be here. */ + if (mp_type_hint[c] != MP_MAP || mp_parser_hint[c] > end - *pos) { +error: + tnt_raise(IllegalParams, "Invalid MsgPack - iproto header"); + } + uint32_t size = mp_decode_map(pos); + for (int i = 0; i < size; i++) { + if (*pos >= end) + goto error; + + c = (unsigned char) **pos; + if (mp_type_hint[c] != MP_UINT || mp_parser_hint[c] != 1 || + c > IPROTO_SYNC) { + mp_check(pos, end); + mp_check(pos, end); + continue; + } - struct iproto_header * - header = iproto(in->end - con->parse_size); - iproto_validate_header(header, fd); + *pos += 1; + if (*pos >= end) + goto error; - if (con->parse_size < (sizeof(struct iproto_header) + - header->len)) - break; + c = (unsigned char) **pos; + if (mp_type_hint[c] != MP_UINT || + mp_parser_hint[c] + *pos >= end) { + goto error; + } + keys[c] = mp_decode_uint(pos); + } +} - iproto_enqueue_request(&request_queue, con, - con->iobuf[0], header, - iproto_process_request); - con->parse_size -= sizeof(*header) + header->len; +/** Enqueue all requests which were read up. */ +static inline void +iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) +{ + while (con->parse_size) { + const char *reqstart = in->end - con->parse_size; + /* Read request length. */ + unsigned char c = (unsigned char) *reqstart; + if (mp_type_hint[c] != MP_UINT) { + tnt_raise(IllegalParams, + "Invalid MsgPack - packet length"); + } + if (mp_parser_hint[c] + reqstart >= in->end) + break; + const char *pos = reqstart; + uint32_t len = mp_decode_uint(&pos); + iproto_validate_header(len); + const char *reqend = pos + len; + if (reqend > in->end) + break; + /* Parse request header. */ + uint32_t header[2]; + mp_decode_imap(&pos, reqend, header); + + struct iproto_request *ireq = + iproto_enqueue_request(&request_queue); + iproto_request_init(ireq, con, con->iobuf[0], + iproto_process_request, + header[IPROTO_CODE], header[IPROTO_SYNC], + pos, reqend - pos); + + /* the request is parsed */ + con->parse_size -= reqend - reqstart; + /* request length and header can be discarded. */ + in->pos += pos - reqstart; } } @@ -503,7 +556,7 @@ iproto_connection_on_input(struct ev_io *watcher, in->end += nrd; con->parse_size += nrd; /* Enqueue all requests which are fully read up. */ - iproto_enqueue_batch(con, in, fd); + iproto_enqueue_batch(con, in); /* * Keep reading input, as long as the socket * supplies data. @@ -611,52 +664,43 @@ iproto_queue_handler(va_list ap) /* {{{ iproto_process_* functions */ -/** Stack a reply to a single request to the fiber's io vector. */ -static inline void -iproto_reply(struct iproto_port *port, box_process_func callback, - struct obuf *out, struct iproto_header *header) -{ - if (header->msg_code == MSG_PING) - return iproto_reply_ping(out, header->sync); - - /* Make request body point to iproto data */ - char *body = (char *) &header[1]; - iproto_port_init(port, out, header->sync); - try { - struct request request; - request_create(&request, header->msg_code, body, header->len); - callback((struct port *) port, &request); - } catch (const ClientError& e) { - if (port->found) - obuf_rollback_to_svp(out, &port->svp); - iproto_reply_error(out, e, header->sync); - } -} - static void -iproto_process_request(struct iproto_request *request) +iproto_process_request(struct iproto_request *ireq) { - struct iproto_connection *con = request->connection; - struct iproto_header *header = request->header; - struct iobuf *iobuf = request->iobuf; - struct iproto_port port; + struct iobuf *iobuf = ireq->iobuf; + struct iproto_connection *con = ireq->connection; auto scope_guard = make_scoped_guard([=]{ - iobuf->in.pos += sizeof(*header) + header->len; - if (iproto_connection_is_idle(con)) + iobuf->in.pos += ireq->len; + + if (evio_is_active(&con->output)) { + if (! ev_is_active(&con->output)) + ev_feed_event(&con->output, EV_WRITE); + } else if (iproto_connection_is_idle(con)) { iproto_connection_destroy(con); + } }); if (unlikely(! evio_is_active(&con->output))) return; - iproto_reply(&port, *con->handler, &iobuf->out, header); + struct obuf *out = &iobuf->out; - if (unlikely(! evio_is_active(&con->output))) - return; + if (ireq->code == MSG_PING) + return iproto_reply_ping(out, ireq->sync); - if (! ev_is_active(&con->output)) - ev_feed_event(&con->output, EV_WRITE); + /* Make request body point to iproto data */ + struct iproto_port port; + iproto_port_init(&port, out, ireq->sync); + try { + struct request request; + request_create(&request, ireq->code, ireq->body, ireq->len); + (*con->handler)((struct port *) &port, &request); + } catch (const ClientError &e) { + if (port.found) + obuf_rollback_to_svp(out, &port.svp); + iproto_reply_error(out, e, ireq->sync); + } } /** @@ -673,7 +717,7 @@ iproto_process_connect(struct iproto_request *request) try { /* connect. */ con->session = session_create(fd, con->cookie); } catch (const ClientError& e) { - iproto_reply_error(&iobuf->out, e, request->header->sync); + iproto_reply_error(&iobuf->out, e, request->sync); try { iproto_flush(iobuf, fd, &con->write_pos); } catch (const Exception& e) { @@ -721,9 +765,9 @@ iproto_on_accept(struct evio_service *service, int fd, box_process_func *process_fun = (box_process_func*) service->on_accept_param; con = iproto_connection_create(name, fd, addr, process_fun); - iproto_enqueue_request(&request_queue, con, - con->iobuf[0], &dummy_header, - iproto_process_connect); + struct iproto_request *ireq = iproto_enqueue_request(&request_queue); + iproto_request_init(ireq, con, con->iobuf[0], + iproto_process_connect, 0, 0, 0, 0); } /** diff --git a/src/iproto_port.cc b/src/iproto_port.cc index abdb6369a6a1214065789f1ad96f91cd00f08f7b..4c32d76fdf38cbf909b4f5a469f66bcd50a366d2 100644 --- a/src/iproto_port.cc +++ b/src/iproto_port.cc @@ -28,20 +28,6 @@ */ #include "iproto_port.h" -enum iproto_key { - IPROTO_CODE = 0, - IPROTO_SYNC, - IPROTO_SPACE, - IPROTO_INDEX, - IPROTO_TUPLE, - IPROTO_OFFSET, - IPROTO_LIMIT, - IPROTO_ITERATOR, - IPROTO_NAME, - IPROTO_OPS, - IPROTO_DATA, - IPROTO_ERROR, -}; /* m_ - msgpack meta, k_ - key, v_ - value */ struct iproto_header_bin { diff --git a/src/iproto_port.h b/src/iproto_port.h index 74b6ba25e4c97af12dbfdc1bb921174fe91a3003..c16c1017784f61a00bcee824fcd75f36697d9429 100644 --- a/src/iproto_port.h +++ b/src/iproto_port.h @@ -40,6 +40,23 @@ enum { IPROTO_BODY_LEN_MAX = 2147483648UL }; +enum iproto_key { + IPROTO_CODE = 0, + IPROTO_SYNC = 1, + /* Leave a gap for other keys in the header. */ + IPROTO_SPACE = 16, + IPROTO_INDEX = 17, + IPROTO_LIMIT = 18, + IPROTO_OFFSET = 19, + IPROTO_ITERATOR = 20, + /* Leave a gap for other integer values in the body */ + IPROTO_TUPLE = 32, + IPROTO_NAME = 33, + IPROTO_OPS = 34, + IPROTO_DATA = 35, + IPROTO_ERROR = 36, +}; + enum { MSG_PING = 0 }; /** diff --git a/test/lib/tarantool-python b/test/lib/tarantool-python index e400b6699a00044134ae93b8ff83afd7dc48542f..c40899870bc044ed6afd980bcd7b2fd16984db0e 160000 --- a/test/lib/tarantool-python +++ b/test/lib/tarantool-python @@ -1 +1 @@ -Subproject commit e400b6699a00044134ae93b8ff83afd7dc48542f +Subproject commit c40899870bc044ed6afd980bcd7b2fd16984db0e