diff --git a/cfg/core_cfg.cfg_tmpl b/cfg/core_cfg.cfg_tmpl index c2b47ca4e651c77f5152537c314b810cc253d7b4..edab8bacb8cb2bd022182bc5bd60a84506cf6492 100644 --- a/cfg/core_cfg.cfg_tmpl +++ b/cfg/core_cfg.cfg_tmpl @@ -7,8 +7,8 @@ username=NULL, ro # In local hot standby mode the server only accepts reads. local_hot_standby=false, ro -# tarantool bind ip address, applies to master -# and replication ports. INADDR_ANY is the default value. +# tarantool bind ip address, applies to primary port. +# INADDR_ANY is the default value. bind_ipaddr="INADDR_ANY", ro # save core on abort/assert @@ -19,9 +19,6 @@ coredump=false, ro # used for admin's connections admin_port=0, ro -# Replication clients should use this port (bind_ipaddr:replication_port). -replication_port=0, ro - # Log verbosity, possible values: SYS_ERROR = 1, ERROR=2, CRIT=3, WARN=4, INFO=5(default), DEBUG=6 log_level=5 diff --git a/cfg/tarantool_box_cfg.c b/cfg/tarantool_box_cfg.c index 2545277132d34236b5aa26293ce22ebc483c72f2..faf0c1407b0e47b7fe17a133d0d417562acab909 100644 --- a/cfg/tarantool_box_cfg.c +++ b/cfg/tarantool_box_cfg.c @@ -33,7 +33,6 @@ init_tarantool_cfg(tarantool_cfg *c) { c->bind_ipaddr = NULL; c->coredump = false; c->admin_port = 0; - c->replication_port = 0; c->log_level = 0; c->slab_alloc_arena = 0; c->slab_alloc_minimal = 0; @@ -69,7 +68,6 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { if (c->bind_ipaddr == NULL) return CNF_NOMEMORY; c->coredump = false; c->admin_port = 0; - c->replication_port = 0; c->log_level = 5; c->slab_alloc_arena = 1; c->slab_alloc_minimal = 64; @@ -122,9 +120,6 @@ static NameAtom _name__coredump[] = { static NameAtom _name__admin_port[] = { { "admin_port", -1, NULL } }; -static NameAtom _name__replication_port[] = { - { "replication_port", -1, NULL } -}; static NameAtom _name__log_level[] = { { "log_level", -1, NULL } }; @@ -317,20 +312,6 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_RDONLY; c->admin_port = i32; } - else if ( cmpNameAtoms( opt->name, _name__replication_port) ) { - if (opt->paramType != scalarType ) - return CNF_WRONGTYPE; - c->__confetti_flags &= ~CNF_FLAG_STRUCT_NOTSET; - errno = 0; - long int i32 = strtol(opt->paramValue.scalarval, NULL, 10); - if (i32 == 0 && errno == EINVAL) - return CNF_WRONGINT; - if ( (i32 == LONG_MIN || i32 == LONG_MAX) && errno == ERANGE) - return CNF_WRONGRANGE; - if (check_rdonly && c->replication_port != i32) - return CNF_RDONLY; - c->replication_port = i32; - } else if ( cmpNameAtoms( opt->name, _name__log_level) ) { if (opt->paramType != scalarType ) return CNF_WRONGTYPE; @@ -756,7 +737,6 @@ typedef enum IteratorState { S_name__bind_ipaddr, S_name__coredump, S_name__admin_port, - S_name__replication_port, S_name__log_level, S_name__slab_alloc_arena, S_name__slab_alloc_minimal, @@ -855,17 +835,6 @@ tarantool_cfg_iterator_next(tarantool_cfg_iterator_t* i, tarantool_cfg *c, char } sprintf(*v, "%"PRId32, c->admin_port); snprintf(buf, PRINTBUFLEN-1, "admin_port"); - i->state = S_name__replication_port; - return buf; - case S_name__replication_port: - *v = malloc(32); - if (*v == NULL) { - free(i); - out_warning(CNF_NOMEMORY, "No memory to output value"); - return NULL; - } - sprintf(*v, "%"PRId32, c->replication_port); - snprintf(buf, PRINTBUFLEN-1, "replication_port"); i->state = S_name__log_level; return buf; case S_name__log_level: @@ -1145,7 +1114,6 @@ dup_tarantool_cfg(tarantool_cfg* dst, tarantool_cfg* src) { return CNF_NOMEMORY; dst->coredump = src->coredump; dst->admin_port = src->admin_port; - dst->replication_port = src->replication_port; dst->log_level = src->log_level; dst->slab_alloc_arena = src->slab_alloc_arena; dst->slab_alloc_minimal = src->slab_alloc_minimal; @@ -1259,11 +1227,6 @@ cmp_tarantool_cfg(tarantool_cfg* c1, tarantool_cfg* c2, int only_check_rdonly) { return diff; } - if (c1->replication_port != c2->replication_port) { - snprintf(diff, PRINTBUFLEN - 1, "%s", "c->replication_port"); - - return diff; - } if (!only_check_rdonly) { if (c1->log_level != c2->log_level) { snprintf(diff, PRINTBUFLEN - 1, "%s", "c->log_level"); diff --git a/cfg/tarantool_box_cfg.h b/cfg/tarantool_box_cfg.h index c88875c2c4f52abd1569060a2b4ec80fcdc27e1c..4c420903e4b90e35a95b0e74804691d8f845cc21 100644 --- a/cfg/tarantool_box_cfg.h +++ b/cfg/tarantool_box_cfg.h @@ -29,8 +29,8 @@ typedef struct tarantool_cfg { confetti_bool_t local_hot_standby; /* - * tarantool bind ip address, applies to master - * and replication ports. INADDR_ANY is the default value. + * tarantool bind ip address, applies to primary port. + * INADDR_ANY is the default value. */ char* bind_ipaddr; @@ -46,9 +46,6 @@ typedef struct tarantool_cfg { */ int32_t admin_port; - /* Replication clients should use this port (bind_ipaddr:replication_port). */ - int32_t replication_port; - /* Log verbosity, possible values: SYS_ERROR = 1, ERROR=2, CRIT=3, WARN=4, INFO=5(default), DEBUG=6 */ int32_t log_level; diff --git a/src/box/box.cc b/src/box/box.cc index 6a851f76f8e69d89c5a8630dce7e49555dabd819..5146aaa5204f5f03e5f68d9345f1ef2e7509b8cc 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -37,6 +37,7 @@ extern "C" { #include <errcode.h> #include <recovery.h> +#include "replica.h" #include <log_io.h> #include <pickle.h> #include <say.h> @@ -290,14 +291,14 @@ box_init() recovery_setup_panic(recovery_state, cfg.panic_on_snap_error, cfg.panic_on_wal_error); stat_base = stat_register(iproto_request_type_strs, - IPROTO_REQUEST_MAX); + IPROTO_DML_REQUEST_MAX); recover_snap(recovery_state, cfg.replication_source); space_end_recover_snapshot(); recover_existing_wals(recovery_state); space_end_recover(); - stat_cleanup(stat_base, IPROTO_REQUEST_MAX); + stat_cleanup(stat_base, IPROTO_DML_REQUEST_MAX); title("orphan", NULL); if (cfg.local_hot_standby) { say_info("starting local hot standby"); diff --git a/src/box/lua/box_net.lua b/src/box/lua/box_net.lua index 2f86d071de5dc5fc165d379cd1379b9283881c40..2c7ce7b7c8b744c0a61bb392943626438e58abc9 100644 --- a/src/box/lua/box_net.lua +++ b/src/box/lua/box_net.lua @@ -52,7 +52,6 @@ local function rpc_index(r, name) o.path = path o.method = name - printf("path: %s", path) setmetatable(o, { __index = rpc_index, __call = rpc_call, @@ -74,7 +73,7 @@ box.net = { -- routes requests to a remote. -- box = { - PING = 0, + PING = 64, SELECT = 1, INSERT = 2, REPLACE = 3, diff --git a/src/box/request.cc b/src/box/request.cc index 0f2e45c8c51a037f978bf869284a7688287248e3..230942762c34eb92eda903a6e2973a2f3b5fa762 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -181,11 +181,8 @@ execute_delete(struct request *request, struct txn *txn, struct port *port) void request_check_type(uint32_t type) { - if (type < IPROTO_SELECT || type >= IPROTO_REQUEST_MAX) { - say_error("Unsupported request = %u", (unsigned) type); - tnt_raise(IllegalParams, "unsupported command code, " - "check the error log"); - } + if (type < IPROTO_SELECT || type >= IPROTO_DML_REQUEST_MAX) + tnt_raise(IllegalParams, "unknown request type %u", type); } void diff --git a/src/exception.cc b/src/exception.cc index 64d739dc6842746b759b8b505d53d2f014bbfdd9..d42b42cdbc3259b5da5c05d0fc426c74f17d135e 100644 --- a/src/exception.cc +++ b/src/exception.cc @@ -143,13 +143,6 @@ ClientError::log() const m_errmsg); } -IllegalParams::IllegalParams(const char *file, unsigned line, const char *msg) - : LoggedError(file, line, ER_ILLEGAL_PARAMS, msg) -{ - /* nothing */ -} - - ErrorInjection::ErrorInjection(const char *file, unsigned line, const char *msg) : LoggedError(file, line, ER_INJECTION, msg) { diff --git a/src/exception.h b/src/exception.h index a98317ed68eb53c3daf88b895dbceb3a80df8e86..3cf645e399b1d833505128745ca4b92d0c4a87c6 100644 --- a/src/exception.h +++ b/src/exception.h @@ -136,7 +136,11 @@ class LoggedError: public ClientError { class IllegalParams: public LoggedError { public: - IllegalParams(const char *file, unsigned line, const char *msg); + template <typename ... Args> + IllegalParams(const char *file, unsigned line, const char *format, + Args ... args) + :LoggedError(file, line, ER_ILLEGAL_PARAMS, + format, args...) {} }; class ErrorInjection: public LoggedError { diff --git a/src/iobuf.h b/src/iobuf.h index ff65bd70ff41de4ad2e161da83ba306e9b4186ee..79cb689ae5e336f200e895b2d7186bf5903a3753 100644 --- a/src/iobuf.h +++ b/src/iobuf.h @@ -71,6 +71,7 @@ ibuf_reserve(struct ibuf *ibuf, size_t sz); static inline size_t ibuf_size(struct ibuf *ibuf) { + assert(ibuf->end >= ibuf->pos); return ibuf->end - ibuf->pos; } diff --git a/src/iproto.cc b/src/iproto.cc index cad3173ab5fe6d7d8efbdd90cab585d099ce70d9..e0af04080c77c934568babe0777a67233cc6d999 100644 --- a/src/iproto.cc +++ b/src/iproto.cc @@ -43,11 +43,75 @@ #include "scoped_guard.h" #include "memory.h" #include "msgpuck/msgpuck.h" +#include "replication.h" + +class IprotoConnectionShutdown: public Exception +{ +public: + IprotoConnectionShutdown(const char *file, int line) + :Exception(file, line) {} + virtual void log() const; +}; + +void +IprotoConnectionShutdown::log() const +{} + +/* {{{ iproto_request - declaration */ + +struct iproto_connection; + +typedef void (*iproto_request_f)(struct iproto_request *); + +/** + * A single request from the client. All requests + * from all clients are queued into a single queue + * and processed in FIFO order. + */ +struct iproto_request +{ + struct iproto_connection *connection; + struct iobuf *iobuf; + struct session *session; + iproto_request_f process; + /* Request message code and sync. */ + uint32_t header[2]; + /* Box request, if this is a DML */ + struct request request; +}; + +struct mempool iproto_request_pool; + +static struct iproto_request * +iproto_request_new(struct iproto_connection *con, + iproto_request_f process); + +static void +iproto_process_connect(struct iproto_request *request); + +static void +iproto_process_disconnect(struct iproto_request *request); + +static void +iproto_process_dml(struct iproto_request *request); + +struct IprotoRequestGuard { + struct iproto_request *ireq; + IprotoRequestGuard(struct iproto_request *ireq_arg):ireq(ireq_arg) {} + ~IprotoRequestGuard() + { if (ireq) mempool_free(&iproto_request_pool, ireq); } + struct iproto_request *release() + { struct iproto_request *tmp = ireq; ireq = NULL; return tmp; } +}; + +/* }}} */ /* {{{ iproto_queue */ struct iproto_request; +enum { IPROTO_REQUEST_QUEUE_SIZE = 2048, }; + /** * Implementation of an input queue of the box request processor. * @@ -57,21 +121,12 @@ struct iproto_request; * requests in the queue. It leases a fiber from a pool * and runs the request in the fiber. * - * @sa iproto_queue_schedule, iproto_handler, iproto_handshake + * @sa iproto_queue_schedule */ - struct iproto_queue { - /** - * Ring buffer of fixed size, preallocated - * during initialization. - */ - struct iproto_request *queue; - /** - * Main function of the fiber invoked to handle - * all outstanding tasks in this queue. - */ - void (*handler)(va_list); + /** Ring buffer of fixed size */ + struct iproto_request *queue[IPROTO_REQUEST_QUEUE_SIZE]; /** * Cache of fibers which work on requests * in this queue. @@ -88,33 +143,11 @@ struct iproto_queue int size; }; -enum { - IPROTO_REQUEST_QUEUE_SIZE = 2048, -}; - -struct iproto_connection; - -typedef void (*iproto_request_f)(struct iproto_request *); - -/** - * A single request from the client. All requests - * from all clients are queued into a single queue - * and processed in FIFO order. - */ -struct iproto_request +static inline bool +iproto_queue_is_empty(struct iproto_queue *i_queue) { - struct iproto_connection *connection; - struct iobuf *iobuf; - iproto_request_f process; - /* Request message code. */ - uint32_t code; - /* Request sync. */ - uint32_t sync; - /* Position of request body in the input buffer. */ - const char *body; - /* Length of the body */ - uint32_t len; -}; + return i_queue->begin == i_queue->end; +} /** * A single global queue for all requests in all connections. All @@ -123,38 +156,15 @@ struct iproto_request * execute disconnect triggers. A few notes about these triggers: * - they need to be run in a fiber * - unlike an ordinary request failure, on_connect trigger - * failure must lead to connection shutdown. - * - as long as on_connect trigger can be used for client - * authentication, it must be processed before any other request - * on this connection. + * failure must lead to connection close. + * - on_connect trigger must be processed before any other + * request on this connection. */ static struct iproto_queue request_queue; -static inline bool -iproto_queue_is_empty(struct iproto_queue *i_queue) -{ - return i_queue->begin == i_queue->end; -} - -static inline void -iproto_request_init(struct iproto_request *ireq, - struct iproto_connection *con, - struct iobuf *iobuf, - iproto_request_f process, - uint32_t code, uint32_t sync, - const char *body, uint32_t len) -{ - ireq->connection = con; - ireq->iobuf = iobuf; - ireq->process = process; - ireq->code = code; - ireq->sync = sync; - ireq->body = body; - ireq->len = len; -} - -static inline struct iproto_request * -iproto_enqueue_request(struct iproto_queue *i_queue) +static void +iproto_queue_push(struct iproto_queue *i_queue, + struct iproto_request *request) { /* If the queue is full, invoke the handler to work it off. */ if (i_queue->end == i_queue->size) @@ -166,29 +176,40 @@ iproto_enqueue_request(struct iproto_queue *i_queue) */ if (iproto_queue_is_empty(i_queue)) ev_feed_event(loop(), &request_queue.watcher, EV_CUSTOM); - return i_queue->queue + i_queue->end++; + i_queue->queue[i_queue->end++] = request; } -static inline bool -iproto_dequeue_request(struct iproto_queue *i_queue, - struct iproto_request *out) +static struct iproto_request * +iproto_queue_pop(struct iproto_queue *i_queue) { if (i_queue->begin == i_queue->end) - return false; - struct iproto_request *request = i_queue->queue + i_queue->begin++; + return NULL; + struct iproto_request *request = i_queue->queue[i_queue->begin++]; if (i_queue->begin == i_queue->end) i_queue->begin = i_queue->end = 0; - *out = *request; - return true; + return request; } -/** Put the current fiber into a queue fiber cache. */ -static inline void -iproto_cache_fiber(struct iproto_queue *i_queue) +/** + * Main function of the fiber invoked to handle all outstanding + * tasks in a queue. + */ +static void +iproto_queue_handler(va_list ap) { + struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *); + struct iproto_request *request; +restart: + while ((request = iproto_queue_pop(i_queue))) { + IprotoRequestGuard guard(request); + fiber_set_session(fiber(), request->session); + request->process(request); + } + /** Put the current fiber into a queue fiber cache. */ fiber_gc(); rlist_add_entry(&i_queue->fiber_cache, fiber(), state); fiber_yield(); + goto restart; } /** Create fibers to handle all outstanding tasks. */ @@ -204,26 +225,22 @@ iproto_queue_schedule(ev_loop * /* loop */, struct ev_async *watcher, f = rlist_shift_entry(&i_queue->fiber_cache, struct fiber, state); else - f = fiber_new("iproto", i_queue->handler); + f = fiber_new("iproto", iproto_queue_handler); fiber_call(f, i_queue); } } static inline void -iproto_queue_init(struct iproto_queue *i_queue, - int size, void (*handler)(va_list)) +iproto_queue_init(struct iproto_queue *i_queue) { - i_queue->size = size; + i_queue->size = IPROTO_REQUEST_QUEUE_SIZE; i_queue->begin = i_queue->end = 0; - i_queue->queue = (struct iproto_request *) calloc(size, - sizeof (struct iproto_request)); /** * Initialize an ev_async event which would start * workers for all outstanding tasks. */ ev_async_init(&i_queue->watcher, iproto_queue_schedule); i_queue->watcher.data = i_queue; - i_queue->handler = handler; rlist_create(&i_queue->fiber_cache); } @@ -260,13 +277,14 @@ struct iproto_connection * Function of the request processor to handle * a single request. */ - box_process_func *handler; struct ev_io input; struct ev_io output; /** Logical session. */ struct session *session; uint64_t cookie; ev_loop *loop; + /* Pre-allocated disconnect request. */ + struct iproto_request *disconnect; }; static struct mempool iproto_connection_pool; @@ -275,7 +293,7 @@ static struct mempool iproto_connection_pool; * A connection is idle when the client is gone * and there are no outstanding requests in the request queue. * An idle connection can be safely garbage collected. - * Note: a connection only becomes idle after iproto_connection_shutdown(), + * Note: a connection only becomes idle after iproto_connection_close(), * which closes the fd. This is why here the check is for * evio_is_active() (false if fd is closed), not ev_is_active() * (false if event is not started). @@ -295,24 +313,13 @@ static void iproto_connection_on_output(ev_loop * /* loop */, struct ev_io *watcher, int /* revents */); -static void -iproto_process_request(struct iproto_request *request); - -static void -iproto_process_connect(struct iproto_request *request); - -static void -iproto_process_disconnect(struct iproto_request *request); - static struct iproto_connection * -iproto_connection_create(const char *name, int fd, struct sockaddr_in *addr, - box_process_func *param) +iproto_connection_new(const char *name, int fd, struct sockaddr_in *addr) { struct iproto_connection *con = (struct iproto_connection *) mempool_alloc(&iproto_connection_pool); con->input.data = con->output.data = con; con->loop = loop(); - con->handler = param; ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); con->iobuf[0] = iobuf_new(name); @@ -321,18 +328,22 @@ iproto_connection_create(const char *name, int fd, struct sockaddr_in *addr, con->write_pos = obuf_create_svp(&con->iobuf[0]->out); con->session = NULL; con->cookie = *(uint64_t *) addr; + /* It may be very awkward to allocate at close. */ + con->disconnect = iproto_request_new(con, iproto_process_disconnect); return con; } /** Recycle a connection. Never throws. */ static inline void -iproto_connection_destroy(struct iproto_connection *con) +iproto_connection_delete(struct iproto_connection *con) { assert(iproto_connection_is_idle(con)); assert(!evio_is_active(&con->output)); session_destroy(con->session); /* Never throws. No-op if sid is 0. */ iobuf_delete(con->iobuf[0]); iobuf_delete(con->iobuf[1]); + if (con->disconnect) + mempool_free(&iproto_request_pool, con->disconnect); mempool_free(&iproto_connection_pool, con); } @@ -341,7 +352,6 @@ iproto_connection_shutdown(struct iproto_connection *con) { ev_io_stop(con->loop, &con->input); ev_io_stop(con->loop, &con->output); - close(con->input.fd); con->input.fd = con->output.fd = -1; /* * Discard unparsed data, to recycle the con @@ -357,13 +367,20 @@ iproto_connection_shutdown(struct iproto_connection *con) * twice. */ if (iproto_connection_is_idle(con)) { - struct iproto_request *ireq = iproto_enqueue_request(&request_queue); - iproto_request_init(ireq, con, con->iobuf[0], - iproto_process_disconnect, - 0, 0, 0, 0); + struct iproto_request *ireq = con->disconnect; + con->disconnect = NULL; + iproto_queue_push(&request_queue, ireq); } } +static inline void +iproto_connection_close(struct iproto_connection *con) +{ + int fd = con->input.fd; + iproto_connection_shutdown(con); + close(fd); +} + static inline void iproto_validate_header(uint32_t len) { @@ -449,6 +466,28 @@ iproto_connection_input_iobuf(struct iproto_connection *con) return newbuf; } + +int64_t +subscribe_request_decode(const char *begin, const char *end) +{ + const char *pos = begin; + if (mp_check(&pos, end)) + goto error; + if (mp_typeof(*begin) != MP_MAP) + goto error; + mp_decode_map(&begin); + /* Key */ + if (mp_typeof(*begin) != MP_UINT) + goto error; + mp_decode_uint(&begin); + /* Value */ + if (mp_typeof(*begin) != MP_UINT) + goto error; + return mp_decode_uint(&begin); +error: + tnt_raise(ClientError, ER_INVALID_MSGPACK, "subscribe request body"); +} + static inline void iproto_decode_header(const char **pos, const char *end, uint32_t *keys) { @@ -477,6 +516,29 @@ iproto_decode_header(const char **pos, const char *end, uint32_t *keys) } } +static void +iproto_process_admin(struct iproto_request *ireq, + struct iproto_connection *con, + const char *body, const char *end) +{ + switch (ireq->header[IPROTO_CODE]) { + case IPROTO_PING: + iproto_reply_ping(&ireq->iobuf->out, + ireq->header[IPROTO_SYNC]); + break; + case IPROTO_SUBSCRIBE: + subscribe(con->input.fd, + subscribe_request_decode(body, end)); + tnt_raise(IprotoConnectionShutdown); + default: + tnt_raise(IllegalParams, "unknown request type %u", + ireq->header[IPROTO_CODE]); + } + if (! ev_is_active(&con->output)) + ev_feed_event(con->loop, &con->output, EV_WRITE); +} + + /** Enqueue all requests which were read up. */ static inline void iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) @@ -496,21 +558,31 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) const char *reqend = pos + len; if (reqend > in->end) break; - /* Parse request header. */ - uint32_t header[2] = { 0, 0 }; - iproto_decode_header(&pos, reqend, header); - struct iproto_request *ireq = - iproto_enqueue_request(&request_queue); - iproto_request_init(ireq, con, con->iobuf[0], - iproto_process_request, - header[IPROTO_CODE], header[IPROTO_SYNC], - pos, reqend - pos); + iproto_request_new(con, iproto_process_dml); + IprotoRequestGuard guard(ireq); - /* the request is parsed */ + ireq->header[IPROTO_CODE] = ireq->header[IPROTO_SYNC] = 0; + iproto_decode_header(&pos, reqend, ireq->header); + /* + * sic: in case of exception con->parse_size + * as well as in->pos must not be advanced, to + * stay in sync. + */ + if (iproto_request_is_dml(ireq->header[IPROTO_CODE])) { + request_create(&ireq->request, + ireq->header[IPROTO_CODE]); + request_decode(&ireq->request, pos, reqend - pos); + iproto_queue_push(&request_queue, guard.release()); + /* Request header can be discarded. */ + in->pos += pos - reqstart; + } else { + iproto_process_admin(ireq, con, pos, reqend); + /* Entire request can be discarded. */ + in->pos += reqend - reqstart; + } + /* Request is parsed */ con->parse_size -= reqend - reqstart; - /* request length and header can be discarded. */ - in->pos += pos - reqstart; if (con->parse_size == 0) break; } @@ -541,7 +613,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, return; } if (nrd == 0) { /* EOF */ - iproto_connection_shutdown(con); + iproto_connection_close(con); return; } /* Update the read position and connection state. */ @@ -555,9 +627,11 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, */ if (!ev_is_active(&con->input)) ev_feed_event(loop, &con->input, EV_READ); + } catch (IprotoConnectionShutdown *e) { + iproto_connection_shutdown(con); } catch (Exception *e) { e->log(); - iproto_connection_shutdown(con); + iproto_connection_close(con); } } @@ -593,7 +667,7 @@ iproto_flush(struct iobuf *iobuf, int fd, struct obuf_svp *svp) nwr = sio_writev(fd, iov, iovcnt); sio_add_to_iov(iov, svp->iov_len); - } catch (Exception *e) { + } catch (Exception *) { sio_add_to_iov(iov, svp->iov_len); throw; } @@ -632,38 +706,22 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, ev_io_stop(loop, &con->output); } catch (Exception *e) { e->log(); - iproto_connection_shutdown(con); + iproto_connection_close(con); } } -/** A handler to process all queued requests. */ -static void -iproto_queue_handler(va_list ap) -{ - struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *); - struct iproto_request request; -restart: - while (iproto_dequeue_request(i_queue, &request)) { - - fiber_set_session(fiber(), request.connection->session); - request.process(&request); - } - iproto_cache_fiber(&request_queue); - goto restart; -} - /* }}} */ /* {{{ iproto_process_* functions */ static void -iproto_process_request(struct iproto_request *ireq) +iproto_process_dml(struct iproto_request *ireq) { struct iobuf *iobuf = ireq->iobuf; struct iproto_connection *con = ireq->connection; auto scope_guard = make_scoped_guard([=]{ - iobuf->in.pos += ireq->len; + iobuf->in.pos += ireq->request.len; if (evio_is_active(&con->output)) { if (! ev_is_active(&con->output)) @@ -671,7 +729,7 @@ iproto_process_request(struct iproto_request *ireq) &con->output, EV_WRITE); } else if (iproto_connection_is_idle(con)) { - iproto_connection_destroy(con); + iproto_connection_delete(con); } }); @@ -680,24 +738,30 @@ iproto_process_request(struct iproto_request *ireq) struct obuf *out = &iobuf->out; - if (ireq->code == IPROTO_PING) - return iproto_reply_ping(out, ireq->sync); - - /* Make request body point to iproto data */ struct iproto_port port; - iproto_port_init(&port, out, ireq->sync); + iproto_port_init(&port, out, ireq->header[IPROTO_SYNC]); try { - struct request request; - request_create(&request, ireq->code); - request_decode(&request, ireq->body, ireq->len); - (*con->handler)((struct port *) &port, &request); + box_process((struct port *) &port, &ireq->request); } catch (ClientError *e) { if (port.found) obuf_rollback_to_svp(out, &port.svp); - iproto_reply_error(out, e, ireq->sync); + iproto_reply_error(out, e, ireq->header[IPROTO_SYNC]); } } +static struct iproto_request * +iproto_request_new(struct iproto_connection *con, + iproto_request_f process) +{ + struct iproto_request *ireq = + (struct iproto_request *) mempool_alloc(&iproto_request_pool); + ireq->connection = con; + ireq->iobuf = con->iobuf[0]; + ireq->session = con->session; + ireq->process = process; + return ireq; +} + /** * Handshake a connection: invoke the on-connect trigger * and possibly authenticate. Try to send the client an error @@ -712,18 +776,18 @@ iproto_process_connect(struct iproto_request *request) try { /* connect. */ con->session = session_create(fd, con->cookie); } catch (ClientError *e) { - iproto_reply_error(&iobuf->out, e, request->sync); + iproto_reply_error(&iobuf->out, e, request->header[IPROTO_SYNC]); try { iproto_flush(iobuf, fd, &con->write_pos); } catch (Exception *e) { e->log(); } - iproto_connection_shutdown(con); + iproto_connection_close(con); return; } catch (Exception *e) { e->log(); assert(con->session == NULL); - iproto_connection_shutdown(con); + iproto_connection_close(con); return; } /* @@ -740,7 +804,7 @@ static void iproto_process_disconnect(struct iproto_request *request) { /* Runs the trigger, which may yield. */ - iproto_connection_destroy(request->connection); + iproto_connection_delete(request->connection); } /** }}} */ @@ -749,7 +813,7 @@ iproto_process_disconnect(struct iproto_request *request) * Create a connection context and start input. */ static void -iproto_on_accept(struct evio_service *service, int fd, +iproto_on_accept(struct evio_service * /* service */, int fd, struct sockaddr_in *addr) { char name[SERVICE_NAME_MAXLEN]; @@ -757,18 +821,18 @@ iproto_on_accept(struct evio_service *service, int fd, struct iproto_connection *con; - box_process_func *process_fun = - (box_process_func*) service->on_accept_param; - con = iproto_connection_create(name, fd, addr, process_fun); - struct iproto_request *ireq = iproto_enqueue_request(&request_queue); - iproto_request_init(ireq, con, con->iobuf[0], - iproto_process_connect, 0, 0, 0, 0); + con = iproto_connection_new(name, fd, addr); + /* + * Ignore request allocation failure - the queue size is + * fixed so there is a limited number of requests in + * use, all stored in just a few blocks of the memory pool. + */ + struct iproto_request *ireq = + iproto_request_new(con, iproto_process_connect); + iproto_queue_push(&request_queue, ireq); } -/** - * Initialize read-write and read-only ports - * with binary protocol handlers. - */ +/** Initialize a read-write port. */ void iproto_init(const char *bind_ipaddr, int primary_port) { @@ -779,14 +843,16 @@ iproto_init(const char *bind_ipaddr, int primary_port) static struct evio_service primary; evio_service_init(loop(), &primary, "primary", bind_ipaddr, primary_port, - iproto_on_accept, &box_process); + iproto_on_accept, NULL); evio_service_on_bind(&primary, box_leave_local_standby_mode, NULL); evio_service_start(&primary); - iproto_queue_init(&request_queue, IPROTO_REQUEST_QUEUE_SIZE, - iproto_queue_handler); + mempool_create(&iproto_request_pool, &cord()->slabc, + sizeof(struct iproto_request)); + iproto_queue_init(&request_queue); mempool_create(&iproto_connection_pool, &cord()->slabc, sizeof(struct iproto_connection)); } +/* vim: set foldmethod=marker */ diff --git a/src/iproto_constants.h b/src/iproto_constants.h index 344560bdf40fa614079e03b043bf6fda238df3f3..1de71a8f61331b8773609899162c62a525969bbb 100644 --- a/src/iproto_constants.h +++ b/src/iproto_constants.h @@ -81,14 +81,16 @@ iproto_body_has_key(const char *pos, const char *end) extern unsigned char iproto_key_type[IPROTO_KEY_MAX]; enum iproto_request_type { - IPROTO_PING = 0, IPROTO_SELECT = 1, IPROTO_INSERT = 2, IPROTO_REPLACE = 3, IPROTO_UPDATE = 4, IPROTO_DELETE = 5, IPROTO_CALL = 6, - IPROTO_REQUEST_MAX + IPROTO_DML_REQUEST_MAX = 7, + IPROTO_PING = 64, + IPROTO_AUTH = 65, + IPROTO_SUBSCRIBE = 66 }; extern const char *iproto_request_type_strs[]; @@ -96,7 +98,7 @@ extern const char *iproto_request_type_strs[]; static inline const char * iproto_request_name(uint32_t type) { - if (type >= IPROTO_REQUEST_MAX) + if (type >= IPROTO_DML_REQUEST_MAX) return "unknown"; return iproto_request_type_strs[type]; } @@ -107,4 +109,10 @@ iproto_request_is_select(uint32_t type) return type <= IPROTO_SELECT || type == IPROTO_CALL; } +static inline bool +iproto_request_is_dml(uint32_t type) +{ + return type < IPROTO_DML_REQUEST_MAX; +} + #endif /* TARANTOOL_IPROTO_CONSTANTS_H_INCLUDED */ diff --git a/src/iproto_port.cc b/src/iproto_port.cc index 2822aff42c72dbbdc0345bc0f2c03e4d158b5a1b..f8a3f38339e2f88618dafad60af6f17022d891de 100644 --- a/src/iproto_port.cc +++ b/src/iproto_port.cc @@ -71,7 +71,7 @@ iproto_reply_ping(struct obuf *out, uint32_t sync) } void -iproto_reply_error(struct obuf *out, ClientError *e, uint32_t sync) +iproto_reply_error(struct obuf *out, const ClientError *e, uint32_t sync) { uint32_t msg_len = strlen(e->errmsg()); diff --git a/src/iproto_port.h b/src/iproto_port.h index 6ebb9ebf748abca2dd54f3399811a1df2a5fe368..9de89ab8637c0f2f0a62f0b08a6b35974a5d93a8 100644 --- a/src/iproto_port.h +++ b/src/iproto_port.h @@ -84,7 +84,7 @@ iproto_reply_ping(struct obuf *out, uint32_t sync); /** Send an error packet back. */ void -iproto_reply_error(struct obuf *out, ClientError *e, +iproto_reply_error(struct obuf *out, const ClientError *e, uint32_t sync); #endif /* TARANTOOL_IPROTO_PORT_H_INCLUDED */ diff --git a/src/lua/info.cc b/src/lua/info.cc index c2b1a44ade35fe1027c2cd8442ad42ee583e6e95..75acc67e9db37ce4570eb7ef8f28c9d088689dce 100644 --- a/src/lua/info.cc +++ b/src/lua/info.cc @@ -36,7 +36,8 @@ extern "C" { #include <lualib.h> } /* extern "C" */ -#include <recovery.h> +#include "replica.h" +#include "recovery.h" #include "tarantool.h" #include "box/box.h" diff --git a/src/recovery.cc b/src/recovery.cc index 2b1e302166b2c6d32131ead3a4551f6304ff7060..31c0b01f87a7e16f142f059e42de691c10561435 100644 --- a/src/recovery.cc +++ b/src/recovery.cc @@ -38,7 +38,7 @@ #include "errinj.h" #include "bootstrap.h" -#include "replication.h" +#include "replica.h" #include "fiber.h" /* @@ -316,12 +316,9 @@ init_storage_on_replica(struct log_dir *dir, const char *replication_source) say_info("downloading snapshot from master %s...", replication_source); - int master = replica_connect(replication_source); + int master = replica_bootstrap(replication_source); FDGuard guard_master(master); - uint32_t request = RPL_GET_SNAPSHOT; - sio_writen(master, &request, sizeof(request)); - struct { uint64_t lsn; uint64_t file_size; @@ -1232,7 +1229,7 @@ wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie, /* }}} */ -/* {{{ SAVE SNAPSHOT and tarantool_box --cat */ +/* {{{ box.snapshot() */ void snapshot_write_row(struct log_io *l, diff --git a/src/recovery.h b/src/recovery.h index c24f98e67b2f9fe66830f7eb984f27f0e345bed0..03b31cdfad10390481ce5538718c8f24637fe771 100644 --- a/src/recovery.h +++ b/src/recovery.h @@ -29,7 +29,6 @@ * SUCH DAMAGE. */ #include <stdbool.h> -#include <netinet/in.h> #include "trivia/util.h" #include "tarantool_ev.h" @@ -64,17 +63,7 @@ wait_lsn_clear(struct wait_lsn *wait_lsn) struct wal_writer; struct wal_watcher; - -enum { REMOTE_SOURCE_MAXLEN = 32 }; - -/** Master connection */ -struct remote { - struct sockaddr_in addr; - struct fiber *reader; - uint64_t cookie; - ev_tstamp recovery_lag, recovery_last_update_tstamp; - char source[REMOTE_SOURCE_MAXLEN]; -}; +struct remote; enum wal_mode { WAL_NONE = 0, WAL_WRITE, WAL_FSYNC, WAL_FSYNC_DELAY, WAL_MODE_MAX }; @@ -133,22 +122,6 @@ void set_lsn(struct recovery_state *r, int64_t lsn); void recovery_wait_lsn(struct recovery_state *r, int64_t lsn); -void recovery_follow_remote(struct recovery_state *r, const char *addr); -void recovery_stop_remote(struct recovery_state *r); - -void recovery_follow_remote_1_5(struct recovery_state *r, const char *addr); -void recovery_stop_remote_1_5(struct recovery_state *r); - -/** - * The replication protocol is request/response. The - * replica saends a request, and the master responds with - * appropriate data. - */ -enum rpl_request_type { - RPL_GET_WAL = 0, - RPL_GET_SNAPSHOT -}; - struct fio_batch; void snapshot_write_row(struct log_io *i, diff --git a/src/replica.cc b/src/replica.cc index aeaa75d09d4aedae22b91fda8eca0f804514a5e7..f627d131d504d86fa6679558ebabe1d5ed6babc0 100644 --- a/src/replica.cc +++ b/src/replica.cc @@ -39,6 +39,9 @@ #include "coio_buf.h" #include "recovery.h" #include "tarantool.h" +#include "iproto_constants.h" +#include "msgpuck/msgpuck.h" +#include "replica.h" static void remote_apply_row(struct recovery_state *r, const struct log_row *row); @@ -66,6 +69,53 @@ remote_read_row(struct ev_io *coio, struct iobuf *iobuf) return row; } +struct iproto_subscribe_request { + uint8_t v_len; /* length */ + uint8_t m_header; /* MP_MAP */ + uint8_t k_code; /* IPROTO_CODE */ + uint8_t v_code; /* response status */ + uint8_t m_body; /* MP_MAP */ + uint8_t k_data; /* IPROTO_OFFSET */ + uint8_t m_data; /* MP_UINT64 */ + uint64_t lsn; /* lsn */ +} __attribute__((packed)); + +static const struct iproto_subscribe_request iproto_subscribe_request = { + sizeof(struct iproto_subscribe_request) - 1, + 0x81, IPROTO_CODE, IPROTO_SUBSCRIBE, + 0x81, IPROTO_OFFSET, 0xcf, 0 +}; + +int +replica_bootstrap(const char *replication_source) +{ + char ip_addr[32]; + int port; + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + + int rc = sscanf(replication_source, "%31[^:]:%i", + ip_addr, &port); + + assert(rc == 2); + (void)rc; + + addr.sin_family = AF_INET; + if (inet_aton(ip_addr, (in_addr*)&addr.sin_addr.s_addr) < 0) + panic_syserror("inet_aton: %s", ip_addr); + + addr.sin_port = htons(port); + + int master = sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + FDGuard guard(master); + sio_connect(master, &addr, sizeof(addr)); + sio_write(master, &iproto_subscribe_request, + sizeof(iproto_subscribe_request)); + + guard.fd = -1; + return master; +} + static void remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, int64_t initial_lsn, const char **err) @@ -75,18 +125,9 @@ remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, *err = "can't connect to master"; coio_connect(coio, remote_addr); - uint32_t greeting[3] = { xlog_format, tarantool_version_id(), 0 }; - uint32_t master_greeting[3]; - coio_write(coio, greeting, sizeof(greeting)); - coio_readn(coio, master_greeting, sizeof(master_greeting)); - if (master_greeting[0] != greeting[0]) - tnt_raise(IllegalParams, "master has unknown log format"); - - struct send_request { - uint32_t request_type; - int64_t initial_lsn; - } __attribute__((packed)) send_request = { RPL_GET_WAL, initial_lsn }; - coio_write(coio, &send_request, sizeof(send_request)); + struct iproto_subscribe_request request = iproto_subscribe_request; + request.lsn = mp_bswap_u64(initial_lsn); + coio_write(coio, &request, sizeof(request)); say_crit("successfully connected to master"); say_crit("starting replication from lsn: %" PRIi64, initial_lsn); diff --git a/src/replica.h b/src/replica.h new file mode 100644 index 0000000000000000000000000000000000000000..0820f7deddeda2525fa66bc9321621f978c89ff2 --- /dev/null +++ b/src/replica.h @@ -0,0 +1,60 @@ +#ifndef TARANTOOL_REPLICA_H_INCLUDED +#define TARANTOOL_REPLICA_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include <netinet/in.h> +#include "tarantool_ev.h" + +enum { REMOTE_SOURCE_MAXLEN = 32 }; + +/** Master connection */ +struct remote { + struct sockaddr_in addr; + struct fiber *reader; + uint64_t cookie; + ev_tstamp recovery_lag, recovery_last_update_tstamp; + char source[REMOTE_SOURCE_MAXLEN]; +}; + +/** Connect to a master and request a snapshot. + * Raises an exception on error. + * + * @return A connected socket, ready too receive + * data. + */ +int +replica_bootstrap(const char *replication_source); + +void +recovery_follow_remote(struct recovery_state *r, const char *addr); + +void +recovery_stop_remote(struct recovery_state *r); + +#endif /* TARANTOOL_REPLICA_H_INCLUDED */ diff --git a/src/replication.cc b/src/replication.cc index fbd4f3cefe019a6950222695aa4b7f29e762bed8..0a2eb143e10317a48ad37603f1dbc8e33c5d329a 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -60,11 +60,9 @@ extern "C" { * with the spawner using a socketpair(2). Replication relays are * created by the spawner and handle one client connection each. * - * The master process binds to replication_port and accepts + * The master process binds to the primary port and accepts * incoming connections. This is done in the master to be able to - * correctly handle RELOAD CONFIGURATION, which happens in the - * master, and, in future, perform authentication of replication - * clients. + * correctly handle authentication of replication clients. * * Once a client socket is accepted, it is sent to the spawner * process, through the master's end of the socket pair. @@ -79,12 +77,16 @@ extern "C" { */ static int master_to_spawner_socket; -/** Accept a new connection on the replication port: push the accepted socket - * to the spawner. +/** + * State of a replica. We only need one global instance + * since we fork() for every replica. */ -static void -replication_on_accept(struct evio_service *service __attribute__((unused)), - int fd, struct sockaddr_in *addr __attribute__((unused))); +struct replica { + /** Replica connection */ + int sock; + /** Initial lsn. */ + int64_t lsn; +} replica; /** Send a file descriptor to replication relay spawner. * @@ -131,7 +133,7 @@ spawner_sigchld_handler(int signal __attribute__((unused))); * @return 0 on success, -1 on error */ static int -spawner_create_replication_relay(int client_sock); +spawner_create_replication_relay(); /** Shut down all relays when shutting down the spawner. */ static void @@ -139,7 +141,7 @@ spawner_shutdown_children(); /** Initialize replication relay process. */ static void -replication_relay_loop(int client_sock); +replication_relay_loop(); /* * ------------------------------------------------------------------------ @@ -147,28 +149,10 @@ replication_relay_loop(int client_sock); * ------------------------------------------------------------------------ */ -/** Check replication module configuration. */ -int -replication_check_config(struct tarantool_cfg *config) -{ - if (config->replication_port < 0 || - config->replication_port >= USHRT_MAX) { - say_error("invalid replication port value: %" PRId32, - config->replication_port); - return -1; - } - - return 0; -} - /** Pre-fork replication spawner process. */ void replication_prefork() { - if (cfg.replication_port == 0) { - /* replication is not needed, do nothing */ - return; - } int sockpair[2]; /* * Create UNIX sockets to communicate between the main and @@ -201,51 +185,33 @@ replication_prefork() } } -/** - * Create a fiber which accepts client connections and pushes them - * to replication spawner. - */ - -void -replication_init(const char *bind_ipaddr, int replication_port) -{ - if (replication_port == 0) - return; /* replication is not in use */ - - static struct evio_service replication; - - evio_service_init(loop(), &replication, "replication", bind_ipaddr, - replication_port, replication_on_accept, NULL); - - evio_service_start(&replication); -} - - /*-----------------------------------------------------------------------------*/ /* replication accept/sender fibers */ /*-----------------------------------------------------------------------------*/ +/** State of subscribe request - master process. */ +struct subscribe_request { + struct ev_io io; + int fd; + int64_t lsn; +}; + /** Replication acceptor fiber handler. */ -static void -replication_on_accept(struct evio_service *service, - int fd, - struct sockaddr_in *addr __attribute__((unused))) +void +subscribe(int fd, int64_t lsn) { - /* - * Drop the O_NONBLOCK flag, which was possibly - * inherited from the acceptor fd (happens on - * Darwin). - */ - sio_setfl(fd, O_NONBLOCK, 0); - - struct ev_io *io = (struct ev_io *) malloc(sizeof(struct ev_io)); - if (io == NULL) { + struct subscribe_request *request = (struct subscribe_request *) + malloc(sizeof(struct subscribe_request)); + if (request == NULL) { close(fd); return; } - io->data = (void *) (intptr_t) fd; - ev_io_init(io, replication_send_socket, master_to_spawner_socket, EV_WRITE); - ev_io_start(service->loop, io); + request->fd = fd; + request->io.data = request; + request->lsn = lsn; + ev_io_init(&request->io, replication_send_socket, + master_to_spawner_socket, EV_WRITE); + ev_io_start(loop(), &request->io); } @@ -253,21 +219,21 @@ replication_on_accept(struct evio_service *service, static void replication_send_socket(ev_loop *loop, ev_io *watcher, int /* events */) { - int client_sock = (intptr_t) watcher->data; + struct subscribe_request *request = + (struct subscribe_request *) watcher->data; struct msghdr msg; - struct iovec iov[1]; + struct iovec iov; char control_buf[CMSG_SPACE(sizeof(int))]; struct cmsghdr *control_message = NULL; - int cmd_code = 0; - iov[0].iov_base = &cmd_code; - iov[0].iov_len = sizeof(cmd_code); + iov.iov_base = &request->lsn; + iov.iov_len = sizeof(request->lsn); memset(&msg, 0, sizeof(msg)); msg.msg_name = NULL; msg.msg_namelen = 0; - msg.msg_iov = iov; + msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = control_buf; msg.msg_controllen = sizeof(control_buf); @@ -276,62 +242,19 @@ replication_send_socket(ev_loop *loop, ev_io *watcher, int /* events */) control_message->cmsg_len = CMSG_LEN(sizeof(int)); control_message->cmsg_level = SOL_SOCKET; control_message->cmsg_type = SCM_RIGHTS; - *((int *) CMSG_DATA(control_message)) = client_sock; + *((int *) CMSG_DATA(control_message)) = request->fd; /* Send the client socket to the spawner. */ if (sendmsg(master_to_spawner_socket, &msg, 0) < 0) say_syserror("sendmsg"); ev_io_stop(loop, watcher); - free(watcher); /* Close client socket in the main process. */ - close(client_sock); + close(request->fd); + free(request); } -void -replication_handshake(int fd, const char *who) -{ - uint32_t greeting[3] = { xlog_format, tarantool_version_id(), 0}; - uint32_t replica_greeting[3] = { 0 }; - sio_writen(fd, greeting, sizeof(greeting)); - sio_readn(fd, replica_greeting, sizeof(replica_greeting)); - if (replica_greeting[0] != greeting[0]) { - say_error("unsupported %s xlog format %d", - who, replica_greeting[0]); - panic("handshake failed"); - } -} - -int -replica_connect(const char *replication_source) -{ - char ip_addr[32]; - int port; - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - - int rc = sscanf(replication_source, "%31[^:]:%i", - ip_addr, &port); - - assert(rc == 2); - (void)rc; - - addr.sin_family = AF_INET; - if (inet_aton(ip_addr, (in_addr*)&addr.sin_addr.s_addr) < 0) - panic_syserror("inet_aton: %s", ip_addr); - - addr.sin_port = htons(port); - - int master = sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - FDGuard guard(master); - sio_connect(master, &addr, sizeof(addr)); - - replication_handshake(master, "master"); - guard.fd = -1; - return master; -} - /*--------------------------------------------------------------------------* * spawner process * * -------------------------------------------------------------------------*/ @@ -396,8 +319,6 @@ spawner_init(int sock) spawner_main_loop(); } - - static int spawner_unpack_cmsg(struct msghdr *msg) { @@ -417,17 +338,15 @@ static void spawner_main_loop() { struct msghdr msg; - struct iovec iov[1]; + struct iovec iov; char control_buf[CMSG_SPACE(sizeof(int))]; - int cmd_code = 0; - int client_sock; - iov[0].iov_base = &cmd_code; - iov[0].iov_len = sizeof(cmd_code); + iov.iov_base = &replica.lsn; + iov.iov_len = sizeof(replica.lsn); msg.msg_name = NULL; msg.msg_namelen = 0; - msg.msg_iov = iov; + msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = control_buf; msg.msg_controllen = sizeof(control_buf); @@ -435,8 +354,8 @@ spawner_main_loop() while (!spawner.killed) { int msglen = recvmsg(spawner.sock, &msg, 0); if (msglen > 0) { - client_sock = spawner_unpack_cmsg(&msg); - spawner_create_replication_relay(client_sock); + replica.sock = spawner_unpack_cmsg(&msg); + spawner_create_replication_relay(); } else if (msglen == 0) { /* orderly master shutdown */ say_info("Exiting: master shutdown"); break; @@ -503,7 +422,7 @@ spawner_sigchld_handler(int signo __attribute__((unused))) /** Create replication client handler process. */ static int -spawner_create_replication_relay(int client_sock) +spawner_create_replication_relay() { pid_t pid = fork(); @@ -516,10 +435,10 @@ spawner_create_replication_relay(int client_sock) ev_loop_fork(loop()); ev_run(loop(), EVRUN_NOWAIT); close(spawner.sock); - replication_relay_loop(client_sock); + replication_relay_loop(); } else { spawner.child_count++; - close(client_sock); + close(replica.sock); say_info("created a replication relay: pid = %d", (int) pid); } @@ -603,10 +522,10 @@ spawner_shutdown_children() static void replication_relay_recv(ev_loop * /* loop */, struct ev_io *w, int __attribute__((unused)) revents) { - int client_sock = (int) (intptr_t) w->data; + int replica_sock = (int) (intptr_t) w->data; uint8_t data; - int rc = recv(client_sock, &data, sizeof(data), 0); + int rc = recv(replica_sock, &data, sizeof(data), 0); if (rc == 0 || (rc < 0 && errno == ECONNRESET)) { say_info("the client has closed its replication socket, exiting"); @@ -621,12 +540,11 @@ replication_relay_recv(ev_loop * /* loop */, struct ev_io *w, int __attribute__( /** Send a single row to the client. */ static int -replication_relay_send_row(void *param, const struct log_row *row) +replication_relay_send_row(void * /* param */, const struct log_row *row) { - int client_sock = (int) (intptr_t) param; ssize_t bytes, len = log_row_size(row); while (len > 0) { - bytes = write(client_sock, row, len); + bytes = write(replica.sock, row, len); if (bytes < 0) { if (errno == EPIPE) { /* socket closed on opposite site */ @@ -645,9 +563,9 @@ replication_relay_send_row(void *param, const struct log_row *row) } static void -replication_relay_send_snapshot(int client_sock) +replication_relay_send_snapshot() { - FDGuard guard_replica(client_sock); + FDGuard guard_replica(replica.sock); struct log_dir dir = snap_dir; dir.dirname = cfg.snap_dir; int64_t lsn = greatest_lsn(&dir); @@ -665,18 +583,17 @@ replication_relay_send_snapshot(int client_sock) uint64_t header[2]; header[0] = lsn; header[1] = st.st_size; - sio_writen(client_sock, header, sizeof(header)); - sio_sendfile(client_sock, snapshot, NULL, header[1]); + sio_writen(replica.sock, header, sizeof(header)); + sio_sendfile(replica.sock, snapshot, NULL, header[1]); exit(EXIT_SUCCESS); } /** The main loop of replication client service process. */ static void -replication_relay_loop(int client_sock) +replication_relay_loop() { struct sigaction sa; - int64_t lsn; /* Set process title and fiber name. * Even though we use only the main fiber, the logger @@ -684,7 +601,7 @@ replication_relay_loop(int client_sock) */ struct sockaddr_in peer; socklen_t addrlen = sizeof(peer); - getpeername(client_sock, ((struct sockaddr*)&peer), &addrlen); + getpeername(replica.sock, ((struct sockaddr*)&peer), &addrlen); title("relay", "%s", sio_strfaddr(&peer)); fiber_set_name(fiber(), status); @@ -716,41 +633,33 @@ replication_relay_loop(int client_sock) say_syserror("sigaction"); } - replication_handshake(client_sock, "replica"); - uint32_t request; - sio_readn(client_sock, &request, sizeof(request)); - if (request == RPL_GET_SNAPSHOT) { - replication_relay_send_snapshot(client_sock); /* exits */ - } - if (request != RPL_GET_WAL) { - say_error("unknown replica request: %d", request); - exit(EXIT_FAILURE); - } - sio_readn(client_sock, &lsn, sizeof(lsn)); - + if (replica.lsn == 0) + replication_relay_send_snapshot(); /* exits */ /* * Init a read event: when replica closes its end * of the socket, we can read EOF and shutdown the * relay. */ struct ev_io sock_read_ev; - sock_read_ev.data = (void *)(intptr_t) client_sock; - ev_io_init(&sock_read_ev, replication_relay_recv, client_sock, EV_READ); + sock_read_ev.data = (void *)(intptr_t) replica.sock; + ev_io_init(&sock_read_ev, replication_relay_recv, + replica.sock, EV_READ); ev_io_start(loop(), &sock_read_ev); /* Initialize the recovery process */ recovery_init(cfg.snap_dir, cfg.wal_dir, - replication_relay_send_row, (void *)(intptr_t) client_sock, - INT32_MAX); + replication_relay_send_row, + NULL, INT32_MAX); /* * Note that recovery starts with lsn _NEXT_ to * the confirmed one. */ - recovery_state->lsn = recovery_state->confirmed_lsn = lsn - 1; + recovery_state->lsn = recovery_state->confirmed_lsn = replica.lsn - 1; recover_existing_wals(recovery_state); /* Found nothing. */ - if (recovery_state->lsn == lsn - 1) - say_error("can't find WAL containing record with lsn: %" PRIi64, lsn); + if (recovery_state->lsn == replica.lsn - 1) + say_error("can't find WAL containing record with lsn: %" PRIi64, + replica.lsn); recovery_follow_local(recovery_state, 0.1); ev_run(loop(), 0); diff --git a/src/replication.h b/src/replication.h index 5ada8927e0aa68c557edfed3cbf312e7f45bd7ac..40ecc01adfbcc8a8643b9cb345405cbece2e320a 100644 --- a/src/replication.h +++ b/src/replication.h @@ -31,16 +31,6 @@ #include <tarantool.h> #include "trivia/util.h" -/** - * Check replication configuration. - * - * @param config config file to check. - * - * @return 0 on success, -1 on error - */ -int -replication_check_config(struct tarantool_cfg *config); - /** * Pre-fork replication spawner process. * @@ -50,20 +40,12 @@ void replication_prefork(); /** - * Initialize replication module. + * Subscribe a replica to updates. * - * @return None. Panics and exits on error. + * @return None. On error, closes the socket. */ void -replication_init(const char *bind_ipaddr, int replication_port); - -/** Connect to a master and perform an initial handshake. - * Raises an exception on error. - * - * @return A connected socket - */ -int -replica_connect(const char *replication_source); +subscribe(int fd, int64_t lsn); #endif // TARANTOOL_REPLICATION_H_INCLUDED diff --git a/src/tarantool.cc b/src/tarantool.cc index 6309306f5e40e425c993c518ae3d72d52fd6e629..532d2b565394636067e72a132038e69088f689b2 100644 --- a/src/tarantool.cc +++ b/src/tarantool.cc @@ -115,9 +115,9 @@ title(const char *role, const char *fmt, ...) bufptr += snprintf(bufptr, bufend - bufptr, "%s", s); } - int ports[] = { cfg.primary_port, cfg.admin_port, cfg.replication_port }; + int ports[] = { cfg.primary_port, cfg.admin_port }; int *pptr = ports; - const char *names[] = { "pri", "adm", "rpl", NULL }; + const char *names[] = { "pri", "adm", NULL }; const char **nptr = names; for (; *nptr; nptr++, pptr++) @@ -164,9 +164,6 @@ load_cfg(struct tarantool_cfg *conf, int32_t check_rdonly) if (check_cfg_tarantool_cfg(conf) != 0) return -1; - if (replication_check_config(conf) != 0) - return -1; - return box_check_config(conf); } @@ -802,7 +799,6 @@ main(int argc, char **argv) int events = ev_activecnt(loop()); iproto_init(cfg.bind_ipaddr, cfg.primary_port); admin_init(cfg.bind_ipaddr, cfg.admin_port); - replication_init(cfg.bind_ipaddr, cfg.replication_port); session_init(); /* * Load user init script. The script should have access diff --git a/test/box/admin.result b/test/box/admin.result index 8c4b4881fa6c775bc9f1738d58c307063ac12e68..6a8cf5c744b7d2f02b1f9f5f2b13ea0e236d19e0 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -54,21 +54,20 @@ box.cfg() snap_io_rate_limit: 0 log_level: 5 logger_nonblock: true - readahead: 16320 snap_dir: . coredump: false + primary_port: <number> + slab_alloc_arena: 0.1 wal_dir: . - too_long_threshold: 0.5 - rows_per_wal: 50 wal_mode: fsync_delay - slab_alloc_arena: 0.1 + readahead: 16320 panic_on_snap_error: true panic_on_wal_error: false + rows_per_wal: 50 local_hot_standby: false - replication_port: 0 bind_ipaddr: INADDR_ANY wal_fsync_delay: 0 - primary_port: <number> + too_long_threshold: 0.5 wal_dir_rescan_delay: 0.1 ... box.stat() diff --git a/test/box/cfg.result b/test/box/cfg.result index 8733d27902cfce86d6ad4f443060cb5d2541e4ed..de1d45423879e35979bfbb65d291aff6fd86a492 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -16,24 +16,23 @@ t - 'slab_alloc_minimal: 64' - 'admin_port: <number> - 'logger: cat - >> tarantool.log' - - 'readahead: 16320' + - 'snap_io_rate_limit: 0' - 'log_level: 5' - - 'rows_per_wal: 50' - 'logger_nonblock: true' - - 'too_long_threshold: 0.5' + - 'wal_dir: .' - 'snap_dir: .' - 'coredump: false' + - 'too_long_threshold: 0.5' - 'primary_port: <number> - 'panic_on_wal_error: false' - - 'snap_io_rate_limit: 0' - 'wal_mode: fsync_delay' - - 'slab_alloc_arena: 0.1' + - 'readahead: 16320' - 'panic_on_snap_error: true' - 'local_hot_standby: false' - - 'replication_port: 0' + - 'rows_per_wal: 50' - 'bind_ipaddr: INADDR_ANY' - 'wal_fsync_delay: 0' - - 'wal_dir: .' + - 'slab_alloc_arena: 0.1' - 'wal_dir_rescan_delay: 0.1' ... -- must be read-only @@ -52,24 +51,23 @@ t - 'slab_alloc_minimal: 64' - 'admin_port: <number> - 'logger: cat - >> tarantool.log' - - 'readahead: 16320' + - 'snap_io_rate_limit: 0' - 'log_level: 5' - - 'rows_per_wal: 50' - 'logger_nonblock: true' - - 'too_long_threshold: 0.5' + - 'wal_dir: .' - 'snap_dir: .' - 'coredump: false' + - 'too_long_threshold: 0.5' - 'primary_port: <number> - 'panic_on_wal_error: false' - - 'snap_io_rate_limit: 0' - 'wal_mode: fsync_delay' - - 'slab_alloc_arena: 0.1' + - 'readahead: 16320' - 'panic_on_snap_error: true' - 'local_hot_standby: false' - - 'replication_port: 0' + - 'rows_per_wal: 50' - 'bind_ipaddr: INADDR_ANY' - 'wal_fsync_delay: 0' - - 'wal_dir: .' + - 'slab_alloc_arena: 0.1' - 'wal_dir_rescan_delay: 0.1' ... --# clear filter diff --git a/test/box/configuration.result b/test/box/configuration.result index 6ff79f4b2bafcb3bb769e6b7e12aaa4955ecb1e5..8c91826cea92bd063cd2fbdf18f6e6a36efd27ef 100644 --- a/test/box/configuration.result +++ b/test/box/configuration.result @@ -18,21 +18,20 @@ print_config() snap_io_rate_limit: 0 log_level: 5 logger_nonblock: true - readahead: 16320 snap_dir: . coredump: false + primary_port: <number> + slab_alloc_arena: 0.1 wal_dir: . - too_long_threshold: 0.5 - rows_per_wal: 50 wal_mode: fsync_delay - slab_alloc_arena: 0.1 + readahead: 16320 panic_on_snap_error: true panic_on_wal_error: false + rows_per_wal: 50 local_hot_standby: false - replication_port: 0 bind_ipaddr: INADDR_ANY wal_fsync_delay: 0 - primary_port: <number> + too_long_threshold: 0.5 wal_dir_rescan_delay: 0.1 ... diff --git a/test/box/net.box.result b/test/box/net.box.result index 3747572dbd6f593a317f6325a498c4f1287b5f10..9a89eee9ab720d5ee6a4fb2f8b38e1a9d08379ca 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -374,11 +374,11 @@ remote:close() ... remote:close() --- -- error: '[string "-- box_net.lua (internal file)..."]:507: box.net.box: already closed' +- error: '[string "-- box_net.lua (internal file)..."]:506: box.net.box: already closed' ... remote:ping() --- -- error: '[string "-- box_net.lua (internal file)..."]:512: box.net.box: connection +- error: '[string "-- box_net.lua (internal file)..."]:511: box.net.box: connection was closed' ... space:drop() diff --git a/test/box/socket.result b/test/box/socket.result index 173c32f1a2caba3c35b76e165862260d8ea04f99..82b40765dc925d19e9a12f2df7db1da48f623535 100644 --- a/test/box/socket.result +++ b/test/box/socket.result @@ -899,7 +899,7 @@ ping s:close() --- ... - replies = 0 packet = msgpack.encode({[0] = 0, [1] = 0}) packet = msgpack.encode(packet:len())..packet function bug1160869() local s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do _, status = s:recv(18) if status == "eof" then error("unexpected eof") end replies = replies + 1 end end) ) return s:send(packet) end + replies = 0 packet = msgpack.encode({[0] = 64, [1] = 0}) packet = msgpack.encode(packet:len())..packet function bug1160869() local s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do _, status = s:recv(18) if status == "eof" then error("unexpected eof") end replies = replies + 1 end end) ) return s:send(packet) end --- ... bug1160869() @@ -921,7 +921,7 @@ replies --- - 3 ... - s = nil syncno = 0 reps = 0 packet = msgpack.encode({[0] = 0, [1] = 0}) packet = msgpack.encode(packet:len())..packet function iostart() if s ~= nil then return end s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(18) if status == "eof" then error("unexpected eof") end reps = reps + 1 end end)) end function iotest() iostart() syncno = syncno + 1 packet = msgpack.encode({[0] = 0, [1] = syncno}) packet = msgpack.encode(packet:len())..packet return s:send(packet) end + s = nil syncno = 0 reps = 0 packet = msgpack.encode({[0] = 64, [1] = 0}) packet = msgpack.encode(packet:len())..packet function iostart() if s ~= nil then return end s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(18) if status == "eof" then error("unexpected eof") end reps = reps + 1 end end)) end function iotest() iostart() syncno = syncno + 1 packet = msgpack.encode({[0] = 64, [1] = syncno}) packet = msgpack.encode(packet:len())..packet return s:send(packet) end --- ... iotest() diff --git a/test/box/socket.test.py b/test/box/socket.test.py index 6a025e1686d81139a35420b8cad07334cd6ffe34..c5862682e82378c577ebe9d9a5f28e7dae6ae76d 100644 --- a/test/box/socket.test.py +++ b/test/box/socket.test.py @@ -511,7 +511,7 @@ admin("s:close()") # test=""" replies = 0 -packet = msgpack.encode({[0] = 0, [1] = 0}) +packet = msgpack.encode({[0] = 64, [1] = 0}) packet = msgpack.encode(packet:len())..packet function bug1160869() local s = box.socket.tcp() @@ -541,7 +541,7 @@ test=""" s = nil syncno = 0 reps = 0 -packet = msgpack.encode({[0] = 0, [1] = 0}) +packet = msgpack.encode({[0] = 64, [1] = 0}) packet = msgpack.encode(packet:len())..packet function iostart() if s ~= nil then @@ -564,7 +564,7 @@ end function iotest() iostart() syncno = syncno + 1 - packet = msgpack.encode({[0] = 0, [1] = syncno}) + packet = msgpack.encode({[0] = 64, [1] = syncno}) packet = msgpack.encode(packet:len())..packet return s:send(packet) end diff --git a/test/connector_c/cfg/master.cfg b/test/connector_c/cfg/master.cfg index d25bb2038e3b47118ab104b76b6d2f5c15eaf0ea..59dc00f297d0c2b7cde75bea6649bc03ec02c75c 100644 --- a/test/connector_c/cfg/master.cfg +++ b/test/connector_c/cfg/master.cfg @@ -5,6 +5,5 @@ logger="cat - >> tarantool.log" primary_port = 33013 admin_port = 33015 -replication_port = 33016 rows_per_wal = 50 diff --git a/test/lib/tarantool-python b/test/lib/tarantool-python index 515b2ff325870b70cb76342fdb4f55a8ac72da3a..ef3a3b444b9a4e8a90bb5f631687b13f5890048b 160000 --- a/test/lib/tarantool-python +++ b/test/lib/tarantool-python @@ -1 +1 @@ -Subproject commit 515b2ff325870b70cb76342fdb4f55a8ac72da3a +Subproject commit ef3a3b444b9a4e8a90bb5f631687b13f5890048b