diff --git a/cfg/tarantool_box_cfg.c b/cfg/tarantool_box_cfg.c index deea2030808b49a6f11e48e380d5c8dd2a17ba8c..9ac4a967e7c273ee12020ff76822b4b8b010cf43 100644 --- a/cfg/tarantool_box_cfg.c +++ b/cfg/tarantool_box_cfg.c @@ -24,8 +24,52 @@ cmpNameAtoms(NameAtom *a, NameAtom *b) { return (a == NULL && b == NULL) ? 1 : 0; } +void +init_tarantool_cfg(tarantool_cfg *c) { + c->__confetti_flags = 0; + + c->username = NULL; + c->coredump = 0; + c->admin_port = 0; + c->log_level = 0; + c->slab_alloc_arena = 0; + c->slab_alloc_minimal = 0; + c->slab_alloc_factor = 0; + c->work_dir = NULL; + c->pid_file = NULL; + c->logger = NULL; + c->logger_nonblock = 0; + c->io_collect_interval = 0; + c->backlog = 0; + c->readahead = 0; + c->snap_dir = NULL; + c->wal_dir = NULL; + c->primary_port = 0; + c->secondary_port = 0; + c->too_long_threshold = 0; + c->custom_proc_title = NULL; + c->memcached = 0; + c->memcached_namespace = 0; + c->memcached_expire_per_loop = 0; + c->memcached_expire_full_sweep = 0; + c->snap_io_rate_limit = 0; + c->rows_per_wal = 0; + c->wal_fsync_delay = 0; + c->wal_writer_inbox_size = 0; + c->local_hot_standby = 0; + c->wal_dir_rescan_delay = 0; + c->panic_on_snap_error = 0; + c->panic_on_wal_error = 0; + c->remote_hot_standby = 0; + c->wal_feeder_ipaddr = NULL; + c->wal_feeder_port = 0; + c->namespace = NULL; +} + int fill_default_tarantool_cfg(tarantool_cfg *c) { + c->__confetti_flags = 0; + c->username = NULL; c->coredump = 0; c->admin_port = 0; @@ -68,6 +112,13 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { return 0; } +void +swap_tarantool_cfg(struct tarantool_cfg *c1, struct tarantool_cfg *c2) { + struct tarantool_cfg tmpcfg = *c1; + *c1 = *c2; + *c2 = tmpcfg; +} + static int acceptDefault_name__namespace(tarantool_cfg_namespace *c) { c->enabled = -1; @@ -692,8 +743,6 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_WRONGINT; if ( (i32 == LONG_MIN || i32 == LONG_MAX) && errno == ERANGE) return CNF_WRONGRANGE; - if (check_rdonly && c->remote_hot_standby != i32) - return CNF_RDONLY; c->remote_hot_standby = i32; } else if ( cmpNameAtoms( opt->name, _name__wal_feeder_ipaddr) ) { @@ -701,8 +750,6 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_WRONGTYPE; c->__confetti_flags &= ~CNF_FLAG_STRUCT_NOTSET; errno = 0; - if (check_rdonly && ( (opt->paramValue.stringval == NULL && c->wal_feeder_ipaddr == NULL) || strcmp(opt->paramValue.stringval, c->wal_feeder_ipaddr) != 0)) - return CNF_RDONLY; c->wal_feeder_ipaddr = (opt->paramValue.stringval) ? strdup(opt->paramValue.stringval) : NULL; if (opt->paramValue.stringval && c->wal_feeder_ipaddr == NULL) return CNF_NOMEMORY; @@ -717,8 +764,6 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_WRONGINT; if ( (i32 == LONG_MIN || i32 == LONG_MAX) && errno == ERANGE) return CNF_WRONGRANGE; - if (check_rdonly && c->wal_feeder_port != i32) - return CNF_RDONLY; c->wal_feeder_port = i32; } else if ( cmpNameAtoms( opt->name, _name__namespace) ) { @@ -2084,20 +2129,26 @@ cmp_tarantool_cfg(tarantool_cfg* c1, tarantool_cfg* c2, int only_check_rdonly) { return diff; } - if (c1->remote_hot_standby != c2->remote_hot_standby) { - snprintf(diff, PRINTBUFLEN - 1, "%s", "c->remote_hot_standby"); + if (!only_check_rdonly) { + if (c1->remote_hot_standby != c2->remote_hot_standby) { + snprintf(diff, PRINTBUFLEN - 1, "%s", "c->remote_hot_standby"); - return diff; + return diff; + } } - if (confetti_strcmp(c1->wal_feeder_ipaddr, c2->wal_feeder_ipaddr) != 0) { - snprintf(diff, PRINTBUFLEN - 1, "%s", "c->wal_feeder_ipaddr"); + if (!only_check_rdonly) { + if (confetti_strcmp(c1->wal_feeder_ipaddr, c2->wal_feeder_ipaddr) != 0) { + snprintf(diff, PRINTBUFLEN - 1, "%s", "c->wal_feeder_ipaddr"); - return diff; + return diff; } - if (c1->wal_feeder_port != c2->wal_feeder_port) { - snprintf(diff, PRINTBUFLEN - 1, "%s", "c->wal_feeder_port"); + } + if (!only_check_rdonly) { + if (c1->wal_feeder_port != c2->wal_feeder_port) { + snprintf(diff, PRINTBUFLEN - 1, "%s", "c->wal_feeder_port"); - return diff; + return diff; + } } i1->idx_name__namespace = 0; diff --git a/cfg/tarantool_box_cfg.h b/cfg/tarantool_box_cfg.h index 19d130b16c59fd16044c1034fc9807cd6f77acec..44a9c4a1b27f16485073d5a9eb7534deb31dfb5b 100644 --- a/cfg/tarantool_box_cfg.h +++ b/cfg/tarantool_box_cfg.h @@ -170,8 +170,12 @@ typedef struct tarantool_cfg { #define CNF_STRUCT_DEFINED(s) ((s) != NULL && ((s)->__confetti_flags & CNF_FLAG_STRUCT_NOTSET) == 0) #endif +void init_tarantool_cfg(tarantool_cfg *c); + int fill_default_tarantool_cfg(tarantool_cfg *c); +void swap_tarantool_cfg(struct tarantool_cfg *c1, struct tarantool_cfg *c2); + void parse_cfg_file_tarantool_cfg(tarantool_cfg *c, FILE *fh, int check_rdonly, int *n_accepted, int *n_skipped); void parse_cfg_buffer_tarantool_cfg(tarantool_cfg *c, char *buffer, int check_rdonly, int *n_accepted, int *n_skipped); diff --git a/cfg/tarantool_feeder_cfg.c b/cfg/tarantool_feeder_cfg.c index 654730105b994c4489d404c6b33bc4c3e85025e3..3f5fb82f1f76f89b82bb06505d7c8671b3a8e58f 100644 --- a/cfg/tarantool_feeder_cfg.c +++ b/cfg/tarantool_feeder_cfg.c @@ -24,8 +24,34 @@ cmpNameAtoms(NameAtom *a, NameAtom *b) { return (a == NULL && b == NULL) ? 1 : 0; } +void +init_tarantool_cfg(tarantool_cfg *c) { + c->__confetti_flags = 0; + + c->username = NULL; + c->coredump = 0; + c->admin_port = 0; + c->log_level = 0; + c->slab_alloc_arena = 0; + c->slab_alloc_minimal = 0; + c->slab_alloc_factor = 0; + c->work_dir = NULL; + c->pid_file = NULL; + c->logger = NULL; + c->logger_nonblock = 0; + c->io_collect_interval = 0; + c->backlog = 0; + c->readahead = 0; + c->wal_feeder_bind_ipaddr = NULL; + c->wal_feeder_bind_port = 0; + c->wal_feeder_dir = NULL; + c->custom_proc_title = NULL; +} + int fill_default_tarantool_cfg(tarantool_cfg *c) { + c->__confetti_flags = 0; + c->username = NULL; c->coredump = 0; c->admin_port = 0; @@ -48,6 +74,13 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { return 0; } +void +swap_tarantool_cfg(struct tarantool_cfg *c1, struct tarantool_cfg *c2) { + struct tarantool_cfg tmpcfg = *c1; + *c1 = *c2; + *c2 = tmpcfg; +} + static NameAtom _name__username[] = { { "username", -1, NULL } }; diff --git a/cfg/tarantool_feeder_cfg.h b/cfg/tarantool_feeder_cfg.h index 20a2349bed60ba855b6b6dc263c03c8b4f3aed70..5ac2d0a5f34e51daf5b7d37bd4b2877a6044444d 100644 --- a/cfg/tarantool_feeder_cfg.h +++ b/cfg/tarantool_feeder_cfg.h @@ -89,8 +89,12 @@ typedef struct tarantool_cfg { #define CNF_STRUCT_DEFINED(s) ((s) != NULL && ((s)->__confetti_flags & CNF_FLAG_STRUCT_NOTSET) == 0) #endif +void init_tarantool_cfg(tarantool_cfg *c); + int fill_default_tarantool_cfg(tarantool_cfg *c); +void swap_tarantool_cfg(struct tarantool_cfg *c1, struct tarantool_cfg *c2); + void parse_cfg_file_tarantool_cfg(tarantool_cfg *c, FILE *fh, int check_rdonly, int *n_accepted, int *n_skipped); void parse_cfg_buffer_tarantool_cfg(tarantool_cfg *c, char *buffer, int check_rdonly, int *n_accepted, int *n_skipped); diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 2977ac0b3bfefc5e00a2b5cfdeabdccf4335c4ed..e8cb94d5081188d233c47c6a4eb807c38afd8197 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -56,7 +56,7 @@ set (recompiled_core_sources set (common_sources tbuf.m palloc.m util.m diagnostics.m salloc.m pickle.m coro.m stat.m log_io.m - log_io_remote.m iproto.m exceptions.m errcode.c) + log_io_remote.m iproto.m exceptions.m errcode.c latch.m) if (ENABLE_TRACE) set (common_sources ${common_sources} trace.m) diff --git a/core/fiber.m b/core/fiber.m index 1e86df88ced53a4ae06d267b1ab04ba77fed5dc0..682f68079cffe8aabe8576a7dc785ec314d1a633 100644 --- a/core/fiber.m +++ b/core/fiber.m @@ -56,7 +56,7 @@ #include <pickle.h> #include "diagnostics.h" -@implementation tnt_FiberException +@implementation tnt_FiberCancelException @end static struct fiber sched; @@ -121,14 +121,94 @@ fiber_call(struct fiber *callee) coro_transfer(&caller->coro.ctx, &callee->coro.ctx); } + +/** Interrupt a synchronous wait of a fiber inside the event loop. + * We do so by keeping an "async" event in every fiber, solely + * for this purpose, and raising this event here. + */ + +void +fiber_wakeup(struct fiber *f) +{ + ev_async_start(&f->async); + ev_async_send(&f->async); +} + +/** Cancel the subject fiber. + * + * Note: this is not guaranteed to succeed, and requires a level + * of cooperation on behalf of the fiber. A fiber may opt to set + * FIBER_CANCELLABLE to false, and never test that it was + * cancelled. Such fiber we won't be ever to cancel, ever, and + * for such fiber this call will lead to an infinite wait. + * However, fiber_testcancel() is embedded to the rest of fiber_* + * API (@sa yield()), which makes most of the fibers that opt in, + * cancellable. + * + * Currently cancellation can only be synchronous: this call + * returns only when the subject fiber has terminated. + * + * The fiber which is cancelled, has tnt_FiberCancelException + * raised in it. For cancellation to work, this exception type + * should be re-raised whenever (if) it is caught. + */ + +void +fiber_cancel(struct fiber *f) +{ + assert(fiber->fid != 0); + assert(!(f->flags & FIBER_CANCEL)); + + f->flags |= FIBER_CANCEL; + + if (f->flags & FIBER_CANCELLABLE) + fiber_wakeup(f); + + assert(f->waiter == NULL); + f->waiter = fiber; + + @try { + yield(); + } + @finally { + f->waiter = NULL; + } +} + + +/** Test if this fiber is in a cancellable state and was indeed + * cancelled, and raise an exception (tnt_FiberCancelException) if + * that's the case. + */ + void -fiber_raise(struct fiber *callee) +fiber_testcancel(void) { - callee->flags |= FIBER_RAISE; + if (!(fiber->flags & FIBER_CANCELLABLE)) + return; + + if (!(fiber->flags & FIBER_CANCEL)) + return; - fiber_call(callee); + tnt_raise(tnt_FiberCancelException, reason:"fiber_testcancel"); } +/** Change the current cancellation state of a fiber. This is not + * a cancellation point. + */ + +void fiber_setcancelstate(bool enable) +{ + if (enable == true) + fiber->flags |= FIBER_CANCELLABLE; + else + fiber->flags &= ~FIBER_CANCELLABLE; +} + +/** + * @note: this is a cancellation point (@sa fiber_testcancel()) + */ + void yield(void) { @@ -141,34 +221,48 @@ yield(void) callee->csw++; coro_transfer(&caller->coro.ctx, &callee->coro.ctx); - if (fiber->flags & FIBER_RAISE) { - fiber->flags &= ~FIBER_RAISE; - - tnt_raise(tnt_FiberException, reason:"fiber_raise"); - } + fiber_testcancel(); } +/** + * @note: this is a cancellation point (@sa fiber_testcancel()) + */ + void fiber_sleep(ev_tstamp delay) { ev_timer_set(&fiber->timer, delay, 0.); ev_timer_start(&fiber->timer); - yield(); + @try { + yield(); + } + @finally { + ev_timer_stop(&fiber->timer); + } } - -/** Wait for a forked child to complete. */ +/** Wait for a forked child to complete. + * @note: this is a cancellation point (@sa fiber_testcancel()). +*/ void wait_for_child(pid_t pid) { ev_child_set(&fiber->cw, pid, 0); ev_child_start(&fiber->cw); - yield(); - ev_child_stop(&fiber->cw); + @try { + yield(); + } + @finally { + ev_child_stop(&fiber->cw); + } } -void +/** + * @note: this is a cancellation point. + */ + +static void wait_for(int events) { ev_io *io = &fiber->io; @@ -182,10 +276,17 @@ wait_for(int events) if (!ev_is_active(io)) ev_io_start(io); - yield(); + @try { + yield(); + } + @catch (id o) + { + ev_io_stop(io); + @throw; + } } -void +static void unwait(int events) { ev_io *io = &fiber->io; @@ -338,14 +439,20 @@ fiber_zombificate() static void fiber_loop(void *data __attribute__((unused))) { - while (42) { + while (true) { assert(fiber != NULL && fiber->f != NULL && fiber->fid != 0); @try { fiber->f(fiber->f_data); } - @catch (tnt_FiberException *e) { - say_info("fiber `%s': exception `tnt_FiberException': `%s'", - fiber->name, e->reason); + @catch (tnt_FiberCancelException *e) { + say_info("fiber `%s' has been cancelled", fiber->name); + + if (fiber->waiter != NULL) { + fiber_call(fiber->waiter); + + fiber->waiter = NULL; + } + say_info("fiber `%s': exiting", fiber->name); } @catch (tnt_Exception *e) { @@ -364,9 +471,10 @@ fiber_loop(void *data __attribute__((unused))) } } - -/** Set fiber name. -* @Param[in] name the new name of the fiber. Truncated to FIBER_NAME_MAXLEN. +/** Set fiber name. + * + * @param[in] name the new name of the fiber. Truncated to + * FIBER_NAME_MAXLEN. */ void @@ -403,9 +511,10 @@ fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void fiber_alloc(fiber); ev_init(&fiber->io, (void *)ev_schedule); + ev_async_init(&fiber->async, (void *)ev_schedule); ev_init(&fiber->timer, (void *)ev_schedule); ev_init(&fiber->cw, (void *)ev_schedule); - fiber->io.data = fiber->timer.data = fiber->cw.data = fiber; + fiber->io.data = fiber->async.data = fiber->timer.data = fiber->cw.data = fiber; SLIST_INSERT_HEAD(&fibers, fiber, link); } @@ -416,6 +525,7 @@ fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void while (++last_used_fid <= 100) ; /* fids from 0 to 100 are reserved */ fiber->fid = last_used_fid; fiber->flags = 0; + fiber->waiter = NULL; fiber_set_name(fiber, name); palloc_set_name(fiber->pool, fiber->name); register_fid(fiber); @@ -505,13 +615,21 @@ inbox_size(struct fiber *recipient) return ring_size(recipient->inbox); } +/** + * @note: this is a cancellation point (@sa fiber_testcancel()) + */ + void wait_inbox(struct fiber *recipient) { while (ring_size(recipient->inbox) == 0) { recipient->flags |= FIBER_READING_INBOX; - yield(); - recipient->flags &= ~FIBER_READING_INBOX; + @try { + yield(); + } + @finally { + recipient->flags &= ~FIBER_READING_INBOX; + } } } @@ -532,14 +650,23 @@ write_inbox(struct fiber *recipient, struct tbuf *msg) return true; } + +/** + * @note: this is a cancellation point (@sa fiber_testcancel()) + */ + struct msg * read_inbox(void) { struct ring *restrict inbox = fiber->inbox; while (ring_size(inbox) == 0) { fiber->flags |= FIBER_READING_INBOX; - yield(); - fiber->flags &= ~FIBER_READING_INBOX; + @try { + yield(); + } + @finally { + fiber->flags &= ~FIBER_READING_INBOX; + } } struct msg *msg = inbox->ring[inbox->tail]; @@ -549,6 +676,10 @@ read_inbox(void) return msg; } +/** + * @note: this is a cancellation point. + */ + int fiber_bread(struct tbuf *buf, size_t at_least) { @@ -581,6 +712,10 @@ add_iov_dup(void *buf, size_t len) add_iov(copy, len); } +/** + * @note: this is a cancellation point. + */ + ssize_t fiber_flush_output(void) { @@ -627,6 +762,10 @@ fiber_flush_output(void) return result; } +/** + * @note: this is a cancellation point. + */ + ssize_t fiber_read(void *buf, size_t count) { @@ -646,11 +785,15 @@ fiber_read(void *buf, size_t count) } done += r; } - unwait(EV_READ); + return done; } +/** + * @note: this is a cancellation point. + */ + ssize_t fiber_write(const void *buf, size_t count) { @@ -670,11 +813,15 @@ fiber_write(const void *buf, size_t count) } done += r; } - unwait(EV_WRITE); + return done; } +/** + * @note: this is a cancellation point. + */ + int fiber_connect(struct sockaddr_in *addr) { @@ -703,11 +850,11 @@ fiber_connect(struct sockaddr_in *addr) errno = error; goto error; } - unwait(EV_WRITE); + return fiber->fd; + error: - unwait(EV_WRITE); fiber_close(); return fiber->fd; } @@ -1026,81 +1173,8 @@ tcp_server_handler(void *data) say_syserror("accept"); continue; } - - } -} - -static void -udp_server_handler(void *data) -{ - struct fiber_server *server = fiber->data; - bool warning_said = false; - struct sockaddr_in sin; - - if ((fiber->fd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { - say_syserror("socket"); - exit(EX_OSERR); - } - - if (set_nonblock(fiber->fd) == -1) - exit(EX_OSERR); - - memset(&sin, 0, sizeof(struct sockaddr_in)); - sin.sin_family = AF_INET; - sin.sin_port = htons(server->port); - sin.sin_addr.s_addr = INADDR_ANY; - - for (;;) { - if (bind(fiber->fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) { - if (errno == EADDRINUSE) - goto sleep_and_retry; - say_syserror("bind"); - exit(EX_OSERR); - } - - say_info("bound to UDP port %i", server->port); - break; - - sleep_and_retry: - if (!warning_said) { - say_warn("port %i is already in use, " - "will retry binding after 0.1 seconds.", server->port); - warning_said = true; - } - fiber_sleep(0.1); - } - - if (server->on_bind != NULL) - server->on_bind(server->data); - - while (1) { -#define MAXUDPPACKETLEN 128 - char buf[MAXUDPPACKETLEN]; - struct sockaddr_in addr; - socklen_t addrlen; - ssize_t sz; - - wait_for(EV_READ); - - for (;;) { - addrlen = sizeof(addr); - sz = recvfrom(fiber->fd, buf, MAXUDPPACKETLEN, MSG_DONTWAIT, - (struct sockaddr *)&addr, &addrlen); - - if (sz <= 0) { - if (!(errno == EAGAIN || errno == EWOULDBLOCK)) - say_syserror("recvfrom"); - break; - } else { - if (server->handler) { - server->handler(data); - } else { - void (*f) (char *, int) = data; - f(buf, (int)sz); - } - } - } } + unwait(EV_READ); } struct fiber * @@ -1111,10 +1185,11 @@ fiber_server(fiber_server_type type, int port, void (*handler) (void *data), voi struct fiber_server *server; struct fiber *s; + assert(type == tcp_server); + server_name = palloc(eter_pool, 64); snprintf(server_name, 64, "%i/acceptor", port); - s = fiber_create(server_name, -1, -1, - (type == tcp_server) ? tcp_server_handler : udp_server_handler, data); + s = fiber_create(server_name, -1, -1, tcp_server_handler, data); s->data = server = palloc(eter_pool, sizeof(struct fiber_server)); assert(server != NULL); server->port = port; diff --git a/core/latch.m b/core/latch.m new file mode 100644 index 0000000000000000000000000000000000000000..2c3b5268ee9159dbdb161e9a53b2ba7e20e02ee9 --- /dev/null +++ b/core/latch.m @@ -0,0 +1,65 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: 1. Redistributions of source code must + * retain the above copyright notice, this list of conditions and + * the following disclaimer. 2. Redistributions in binary form + * must reproduce the above copyright notice, this list of + * conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <latch.h> + +void +tnt_latch_init(struct tnt_latch *latch) +{ + latch->locked = false; + latch->owner = NULL; +} + +void +tnt_latch_destroy(struct tnt_latch *latch) +{ + assert(latch->locked == false); + + latch->owner = NULL; +} + +int +tnt_latch_trylock(struct tnt_latch *latch) +{ + if (latch->locked) { + assert(latch->owner != fiber); + + return -1; + } + + assert(latch->owner == NULL); + + latch->locked = true; + latch->owner = fiber; + + return 0; +} + +void +tnt_latch_unlock(struct tnt_latch *latch) +{ + assert(latch->owner == fiber); + + latch->locked = false; + latch->owner = NULL; +} diff --git a/core/log_io_remote.m b/core/log_io_remote.m index db02df43c00966083aaf7cfa76309caccd628e06..9da5ed6d15d9bd76737e6249c8c88e0d1e2808f8 100644 --- a/core/log_io_remote.m +++ b/core/log_io_remote.m @@ -134,7 +134,10 @@ pull_from_remote(void *state) struct tbuf *row; for (;;) { + fiber_setcancelstate(true); row = remote_read_row(h->r->confirmed_lsn + 1); + fiber_setcancelstate(false); + h->r->recovery_lag = ev_now() - row_v11(row)->tm; h->r->recovery_last_update_tstamp = ev_now(); diff --git a/core/tarantool.m b/core/tarantool.m index 918fb6103fa633cece4a705f1c5ea08df1a4b993..d8e5f2a64f119713453fa0f880c3824dd3949b2f 100644 --- a/core/tarantool.m +++ b/core/tarantool.m @@ -45,6 +45,7 @@ #include <admin.h> #include <fiber.h> #include <iproto.h> +#include <latch.h> #include <log_io.h> #include <palloc.h> #include <salloc.h> @@ -99,58 +100,72 @@ load_cfg(struct tarantool_cfg *conf, i32 check_rdonly) return mod_check_config(conf); } + i32 reload_cfg(struct tbuf *out) { - struct tarantool_cfg new_cfg1, new_cfg2; - i32 ret; - - // Load with checking readonly params - if (dup_tarantool_cfg(&new_cfg1, &cfg) != 0) { - destroy_tarantool_cfg(&new_cfg1); + static struct tnt_latch *latch = NULL; + struct tarantool_cfg new_cfg, aux_cfg; - return -1; + if (latch == NULL) { + latch = palloc(eter_pool, sizeof(*latch)); + tnt_latch_init(latch); } - ret = load_cfg(&new_cfg1, 1); - if (ret == -1) { - tbuf_append(out, cfg_out->data, cfg_out->len); - destroy_tarantool_cfg(&new_cfg1); + if (tnt_latch_trylock(latch) == -1) { + out_warning(0, "Could not reload configuration: it is being reloaded right now"); + tbuf_append(out, cfg_out->data, cfg_out->len); return -1; } - // Load without checking readonly params - if (fill_default_tarantool_cfg(&new_cfg2) != 0) { - destroy_tarantool_cfg(&new_cfg2); - return -1; - } - ret = load_cfg(&new_cfg2, 0); - if (ret == -1) { - tbuf_append(out, cfg_out->data, cfg_out->len); + @try { + init_tarantool_cfg(&new_cfg); + init_tarantool_cfg(&aux_cfg); - destroy_tarantool_cfg(&new_cfg1); + /* + Prepare a copy of the original config file + for confetti, so that it can compare the new + file with the old one when loading the new file. + Load the new file and return an error if it + contains a different value for some read-only + parameter. + */ + if (dup_tarantool_cfg(&aux_cfg, &cfg) != 0 || + load_cfg(&aux_cfg, 1) != 0) + return -1; + /* + Load the new configuration file, but + skip the check for read only parameters. + new_cfg contains only defaults and + new settings. + */ + if (fill_default_tarantool_cfg(&new_cfg) != 0 || + load_cfg(&new_cfg, 0) != 0) + return -1; + + /* Check that no default value has been changed. */ + char *diff = cmp_tarantool_cfg(&aux_cfg, &new_cfg, 1); + if (diff != NULL) { + out_warning(0, "Could not accept read only '%s' option", diff); + return -1; + } - return -1; + /* Now pass the config to the module, to take action. */ + if (mod_reload_config(&cfg, &new_cfg) != 0) + return -1; + /* All OK, activate the config. */ + swap_tarantool_cfg(&cfg, &new_cfg); } - // Compare only readonly params - char *diff = cmp_tarantool_cfg(&new_cfg1, &new_cfg2, 1); - if (diff != NULL) { - destroy_tarantool_cfg(&new_cfg1); - destroy_tarantool_cfg(&new_cfg2); + @finally { + destroy_tarantool_cfg(&aux_cfg); + destroy_tarantool_cfg(&new_cfg); - out_warning(0, "Could not accept read only '%s' option", diff); - tbuf_append(out, cfg_out->data, cfg_out->len); + if (cfg_out->len != 0) + tbuf_append(out, cfg_out->data, cfg_out->len); - return -1; + tnt_latch_unlock(latch); } - destroy_tarantool_cfg(&new_cfg1); - - mod_reload_config(&cfg, &new_cfg2); - - destroy_tarantool_cfg(&cfg); - - cfg = new_cfg2; return 0; } diff --git a/include/exceptions.h b/include/exceptions.h index f78206c65d14d836156612270f8a18cc0885a249..52dc6eee54d7dfb9822b7c9b08cb3a381eb99ea2 100644 --- a/include/exceptions.h +++ b/include/exceptions.h @@ -3,6 +3,12 @@ #include <objc/Object.h> +/** The base class for all exceptions. + * + * Note: implements garbage collection (see +alloc + * implementation). + */ + @interface tnt_Exception: Object { @public const char *file; diff --git a/include/fiber.h b/include/fiber.h index 5ab9e751cea45b8d95ea3d47974f94874e9513b3..08f259eb90fbe23ec52bedf18c6dd18e7bf8079f 100644 --- a/include/fiber.h +++ b/include/fiber.h @@ -45,9 +45,16 @@ #define FIBER_NAME_MAXLEN 16 #define FIBER_READING_INBOX 0x1 -#define FIBER_RAISE 0x2 +/** Can this fiber be cancelled? */ +#define FIBER_CANCELLABLE 0x2 +/** Indicates that a fiber has been cancelled. */ +#define FIBER_CANCEL 0x4 -@interface tnt_FiberException: tnt_Exception +/** This is thrown by fiber_* API calls when the fiber is + * cancelled. + */ + +@interface tnt_FiberCancelException: tnt_Exception @end struct msg { @@ -62,6 +69,7 @@ struct ring { struct fiber { ev_io io; + ev_async async; #ifdef ENABLE_BACKTRACE void *last_stack_frame; #endif @@ -96,6 +104,8 @@ struct fiber { char peer_name[32]; u32 flags; + + struct fiber *waiter; }; SLIST_HEAD(, fiber) fibers, zombie_fibers; @@ -118,9 +128,7 @@ extern struct fiber *fiber; void fiber_init(void); struct fiber *fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void *); void fiber_set_name(struct fiber *fiber, const char *name); -void wait_for(int events); void wait_for_child(pid_t pid); -void unwait(int events); void yield(void); void fiber_destroy_all(); @@ -162,7 +170,22 @@ ssize_t fiber_flush_output(void); void fiber_cleanup(void); void fiber_gc(void); void fiber_call(struct fiber *callee); -void fiber_raise(struct fiber *callee); +void fiber_wakeup(struct fiber *f); +/** Cancel a fiber. A cancelled fiber will have + * tnt_FiberCancelException raised in it. + * + * A fiber can be cancelled only if it is + * FIBER_CANCELLABLE flag is set. + */ +void fiber_cancel(struct fiber *f); +/** Check if the current fiber has been cancelled. Raises + * tnt_FiberCancelException + */ +void fiber_testcancel(void); +/** Make it possible or not possible to cancel the current + * fiber. + */ +void fiber_setcancelstate(bool enable); int fiber_connect(struct sockaddr_in *addr); void fiber_sleep(ev_tstamp s); void fiber_info(struct tbuf *out); @@ -170,7 +193,6 @@ int set_nonblock(int sock); typedef enum fiber_server_type { tcp_server, - udp_server } fiber_server_type; struct fiber *fiber_server(fiber_server_type type, int port, void (*handler) (void *), void *, diff --git a/include/latch.h b/include/latch.h new file mode 100644 index 0000000000000000000000000000000000000000..a86c35a15d7b8911e2800b2b3ca075515b9a003f --- /dev/null +++ b/include/latch.h @@ -0,0 +1,67 @@ +#ifndef TARANTOOL_LATCH_H_INCLUDED +#define TARANTOOL_LATCH_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: 1. Redistributions of source code must + * retain the above copyright notice, this list of conditions and + * the following disclaimer. 2. Redistributions in binary form + * must reproduce the above copyright notice, this list of + * conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <fiber.h> + +#include <stdbool.h> + +/* + * Internal implementation of a container for a mutex like object + * with similar interface. It's used boolean variable because of + * single threaded nature of tarantool. But it's rather simple to change + * this variable to a mutex object to maintain multi threaded approach. + */ +struct tnt_latch { + bool locked; + + struct fiber *owner; +}; + +/** + * Initialize the given latch. + * + * @param latch Latch to be initialized. + */ +void tnt_latch_init(struct tnt_latch *latch); +/** + * Destroy the given latch. + */ +void tnt_latch_destroy(struct tnt_latch *latch); +/** + * Set the latch to the locked state. If it's already locked + * returns -1 value immediately otherwise returns 0. + * + * @param latch Latch to be locked. + */ +int tnt_latch_trylock(struct tnt_latch *latch); +/** + * Unlock the locked latch. + * + * @param latch Latch to be unlocked. + */ +void tnt_latch_unlock(struct tnt_latch *latch); + + +#endif /* TARANTOOL_LATCH_H_INCLUDED */ diff --git a/include/tarantool.h b/include/tarantool.h index fe58812ea1692509ebea0f5d9debb647ef5f1da5..29301ecdb9c8f8ce602ef8086ca4f56a8246a0e5 100644 --- a/include/tarantool.h +++ b/include/tarantool.h @@ -36,7 +36,7 @@ struct tarantool_cfg; extern const char *mod_name; i32 mod_check_config(struct tarantool_cfg *conf); -void mod_reload_config(struct tarantool_cfg *old_conf, struct tarantool_cfg *new_conf); +i32 mod_reload_config(struct tarantool_cfg *old_conf, struct tarantool_cfg *new_conf); int mod_cat(const char *filename); void mod_snapshot(struct log_io_iter *); void mod_info(struct tbuf *out); diff --git a/mod/box/box.m b/mod/box/box.m index a63f1fe4cb0aa2c76ef203f3f6910e97e814d468..0f49c4cefe8a433d48fd8c2dba293c81f86fe1ef 100644 --- a/mod/box/box.m +++ b/mod/box/box.m @@ -31,6 +31,7 @@ #include <errno.h> #include <arpa/inet.h> +#include <cfg/warning.h> #include <errcode.h> #include <fiber.h> #include <iproto.h> @@ -57,6 +58,8 @@ STRS(messages, MESSAGES); const int MEMCACHED_NAMESPACE = 23; static char *custom_proc_title; +static struct fiber *remote_recover; + /* hooks */ typedef int (*box_hook_t) (struct box_txn * txn); @@ -1330,27 +1333,57 @@ title(const char *fmt, ...) } static void -box_bound_to_primary(void *data __attribute__((unused))) +remote_recovery_restart(struct tarantool_cfg *conf) { - recover_finalize(recovery_state); + if (remote_recover) { + say_info("shutting downing the replica"); + fiber_call(remote_recover); + } - if (cfg.remote_hot_standby) { - say_info("starting a replica"); - status = palloc(eter_pool, 64); - snprintf(status, 64, "replica/%s:%i%s", cfg.wal_feeder_ipaddr, - cfg.wal_feeder_port, custom_proc_title); - recover_follow_remote(recovery_state, cfg.wal_feeder_ipaddr, cfg.wal_feeder_port, - default_remote_row_handler); - - title("replica/%s:%i", cfg.wal_feeder_ipaddr, cfg.wal_feeder_port); + say_info("starting the replica"); + remote_recover = recover_follow_remote(recovery_state, conf->wal_feeder_ipaddr, + conf->wal_feeder_port, + default_remote_row_handler); + + status = palloc(eter_pool, 64); + snprintf(status, 64, "replica/%s:%i%s", conf->wal_feeder_ipaddr, + conf->wal_feeder_port, custom_proc_title); + title("replica/%s:%i%s", conf->wal_feeder_ipaddr, conf->wal_feeder_port, + custom_proc_title); +} + +static void +box_master_or_slave(struct tarantool_cfg *conf) +{ + if (conf->remote_hot_standby) { + box_updates_allowed = false; + + remote_recovery_restart(conf); } else { + if (remote_recover) { + say_info("shuting downing the replica"); + fiber_cancel(remote_recover); + + remote_recover = NULL; + } + say_info("I am primary"); - status = "primary"; + box_updates_allowed = true; + + status = "primary"; title("primary"); } } +static void +box_bound_to_primary(void *data __attribute__((unused))) +{ + recover_finalize(recovery_state); + + box_master_or_slave(&cfg); +} + static void memcached_bound_to_primary(void *data __attribute__((unused))) { @@ -1363,16 +1396,42 @@ memcached_bound_to_primary(void *data __attribute__((unused))) } i32 -mod_check_config(struct tarantool_cfg *conf __attribute__((unused))) +mod_check_config(struct tarantool_cfg *conf) { + if (conf->remote_hot_standby > 0 && conf->local_hot_standby > 0) { + out_warning(0, "Remote and local hot standby modes " + "can't be enabled simultaneously"); + + return -1; + } + return 0; } -void -mod_reload_config(struct tarantool_cfg *old_conf __attribute__((unused)), - struct tarantool_cfg *new_conf __attribute__((unused))) +i32 +mod_reload_config(struct tarantool_cfg *old_conf, struct tarantool_cfg *new_conf) { - return; + if (old_conf->remote_hot_standby != new_conf->remote_hot_standby) { + if (recovery_state->finalize != true) { + out_warning(0, "Could not propagate %s before local recovery finished", + old_conf->remote_hot_standby == true ? "slave to master" : + "master to slave"); + + return -1; + } + } + + if (old_conf->remote_hot_standby != new_conf->remote_hot_standby) { + /* Local recovery must be finalized at this point */ + assert(recovery_state->finalize == true); + + box_master_or_slave(new_conf); + } else if (old_conf->remote_hot_standby > 0 && + (strcmp(old_conf->wal_feeder_ipaddr, new_conf->wal_feeder_ipaddr) != 0 || + old_conf->wal_feeder_port != new_conf->wal_feeder_port)) + remote_recovery_restart(new_conf); + + return 0; } void diff --git a/mod/box/box_cfg.cfg_tmpl b/mod/box/box_cfg.cfg_tmpl index 735d34a54cb82b0926ff1620ff1162fbff008074..d65ad2ed460abbe0ef1191071387cc32686e7ee5 100644 --- a/mod/box/box_cfg.cfg_tmpl +++ b/mod/box/box_cfg.cfg_tmpl @@ -59,9 +59,9 @@ panic_on_wal_error=0, ro # Remote hot standby (if enabled, the server will run in hot standby mode # continuously fetching WAL records from wal_feeder_ipaddr:wal_feeder_port -remote_hot_standby=0, ro -wal_feeder_ipaddr=NULL, ro -wal_feeder_port=0, ro +remote_hot_standby=0 +wal_feeder_ipaddr=NULL +wal_feeder_port=0 namespace = [ diff --git a/mod/feeder/feeder.m b/mod/feeder/feeder.m index 0e9d7389e5dea8f46ccbc44a645e94504803413e..be2f8b8f1fe8c51f3e7f8ad28b638a4fbfbcc0c4 100644 --- a/mod/feeder/feeder.m +++ b/mod/feeder/feeder.m @@ -100,11 +100,11 @@ mod_check_config(struct tarantool_cfg *conf __attribute__((unused))) return 0; } -void +i32 mod_reload_config(struct tarantool_cfg *old_conf __attribute__((unused)), struct tarantool_cfg *new_conf __attribute__((unused))) { - return; + return 0; } void diff --git a/test/box_replication/common.result b/test/box_replication/common.result new file mode 100644 index 0000000000000000000000000000000000000000..27491410571c15b025bed9ce3e4c239752dd31e5 --- /dev/null +++ b/test/box_replication/common.result @@ -0,0 +1,149 @@ + +# +# Insert several tuples. +# + +insert into t0 values (1, 'the tuple #1') +Insert OK, 1 row affected +insert into t0 values (2, 'the tuple #2') +Insert OK, 1 row affected +insert into t0 values (3, 'the tuple #3') +Insert OK, 1 row affected +insert into t0 values (4, 'the tuple #4') +Insert OK, 1 row affected + +# +# Reconfigure slave to replicate from secondary feeder. +# + +reload configuration +--- +ok +... + +# +# Insert more tuples. +# + +insert into t0 values (5, 'the tuple #5') +Insert OK, 1 row affected +insert into t0 values (6, 'the tuple #6') +Insert OK, 1 row affected +insert into t0 values (7, 'the tuple #7') +Insert OK, 1 row affected +insert into t0 values (8, 'the tuple #8') +Insert OK, 1 row affected +insert into t0 values (9, 'the tuple #9') +Insert OK, 1 row affected + +# +# Select from master. +# + +select * from t0 where k0 = 1 +Found 1 tuple: +[1, 'the tuple #1'] +select * from t0 where k0 = 2 +Found 1 tuple: +[2, 'the tuple #2'] +select * from t0 where k0 = 3 +Found 1 tuple: +[3, 'the tuple #3'] +select * from t0 where k0 = 4 +Found 1 tuple: +[4, 'the tuple #4'] +select * from t0 where k0 = 5 +Found 1 tuple: +[5, 'the tuple #5'] +select * from t0 where k0 = 6 +Found 1 tuple: +[6, 'the tuple #6'] +select * from t0 where k0 = 7 +Found 1 tuple: +[7, 'the tuple #7'] +select * from t0 where k0 = 8 +Found 1 tuple: +[8, 'the tuple #8'] +select * from t0 where k0 = 9 +Found 1 tuple: +[9, 'the tuple #9'] + +# +# Select from slave. +# + +select * from t0 where k0 = 1 +Found 1 tuple: +[1, 'the tuple #1'] +select * from t0 where k0 = 2 +Found 1 tuple: +[2, 'the tuple #2'] +select * from t0 where k0 = 3 +Found 1 tuple: +[3, 'the tuple #3'] +select * from t0 where k0 = 4 +Found 1 tuple: +[4, 'the tuple #4'] +select * from t0 where k0 = 5 +Found 1 tuple: +[5, 'the tuple #5'] +select * from t0 where k0 = 6 +Found 1 tuple: +[6, 'the tuple #6'] +select * from t0 where k0 = 7 +Found 1 tuple: +[7, 'the tuple #7'] +select * from t0 where k0 = 8 +Found 1 tuple: +[8, 'the tuple #8'] +select * from t0 where k0 = 9 +Found 1 tuple: +[9, 'the tuple #9'] + +# +# Try to insert into slave. +# + +insert into t0 values (100, 'the tuple #100') +An error occurred: ERR_CODE_NONMASTER, 'Non master connection, but it should be' + +# +# Propagate slave to master. +# + +reload configuration +--- +ok +... + +# +# Try to insert into slave after it was propagated to master. +# + +insert into t0 values (100, 'the tuple #100') +Insert OK, 1 row affected +select * from t0 where k0 = 100 +Found 1 tuple: +[100, 'the tuple #100'] + +# +# Try to propagate beholder to slave. +# + +reload configuration +--- +fail: + - Could not propagate master to slave before local recovery finished +... + +# +# Propagate master to slave. +# + +reload configuration +--- +ok +... +select * from t0 where k0 = 100 +Found 1 tuple: +[100, 'the tuple #100'] diff --git a/test/box_replication/common.test b/test/box_replication/common.test new file mode 100644 index 0000000000000000000000000000000000000000..151abe8d6150f2279cf8166b67afd6be1ea1ef55 --- /dev/null +++ b/test/box_replication/common.test @@ -0,0 +1,137 @@ +import os +from lib.tarantool_box_server import TarantoolBoxServer +from lib.tarantool_feeder_server import TarantoolFeederServer + +master = server + +# Startup master beholder. +beholder = TarantoolBoxServer() +beholder.deploy("box_replication/tarantool_beholder.cfg", + beholder.find_exe(self.args.builddir), + os.path.join(self.args.vardir, "beholder")) + +# Startup master feeder. +feeder = TarantoolFeederServer() +feeder.deploy("box_replication/feeder.cfg", + feeder.find_exe(self.args.builddir), + os.path.join(self.args.vardir, "feeder")) + +# Startup secondary master feeder. +secondary_feeder = TarantoolFeederServer() +secondary_feeder.deploy("box_replication/feeder_secondary.cfg", + secondary_feeder.find_exe(self.args.builddir), + os.path.join(self.args.vardir, "feeder_secondary")) + +# Startup replication server. +slave = TarantoolBoxServer() +slave.deploy("box_replication/tarantool_slave.cfg", + slave.find_exe(self.args.builddir), + os.path.join(self.args.vardir, "slave")) +# Startup replication feeder. +slave_feeder = TarantoolFeederServer() +slave_feeder.deploy("box_replication/feeder_slave.cfg", + slave_feeder.find_exe(self.args.builddir), + os.path.join(self.args.vardir, "slave/feeder")) + +print """ +# +# Insert several tuples. +# +""" +for i in range(1, 5): + master.sql.execute("insert into t0 values ({0}, 'the tuple #{0}')".format(i), + silent=False) + +print """ +# +# Reconfigure slave to replicate from secondary feeder. +# +""" +slave.reconfigure("box_replication/tarantool_slave_secondary_feeder_replication.cfg") +feeder.stop() +feeder.cleanup(True) + +print """ +# +# Insert more tuples. +# +""" +for i in range(5, 10): + master.sql.execute("insert into t0 values ({0}, 'the tuple #{0}')".format(i), + silent=False) + +print """ +# +# Select from master. +# +""" +for i in range(1, 10): + master.sql.execute("select * from t0 where k0 = {0}".format(i), + silent=False) + +slave.wait_sync(10) + +print """ +# +# Select from slave. +# +""" +for i in range(1,10): + slave.sql.execute("select * from t0 where k0 = {0}".format(i), + silent=False) + +print """ +# +# Try to insert into slave. +# +""" +slave.sql.execute("insert into t0 values (100, 'the tuple #100')", + silent=False) + +print """ +# +# Propagate slave to master. +# +""" +slave.reconfigure("box_replication/tarantool_slave_to_master.cfg") + +print """ +# +# Try to insert into slave after it was propagated to master. +# +""" +slave.sql.execute("insert into t0 values (100, 'the tuple #100')", + silent=False) +slave.sql.execute("select * from t0 where k0 = 100", + silent=False) + +print """ +# +# Try to propagate beholder to slave. +# +""" +beholder.reconfigure("box_replication/tarantool_beholder_to_slave.cfg") + +print """ +# +# Propagate master to slave. +# +""" +master.reconfigure("box_replication/tarantool_to_slave.cfg") +master.wait_sync(11) +master.sql.execute("select * from t0 where k0 = 100", + silent=False) + +# Cleanup. +beholder.stop() +beholder.cleanup(True) +secondary_feeder.stop() +secondary_feeder.cleanup(True) +slave_feeder.stop() +slave_feeder.cleanup(True) +slave.stop() +slave.cleanup(True) +server.stop() +server.deploy(self.suite_ini["config"]) + +# vim: syntax=python diff --git a/test/box_replication/feeder.cfg b/test/box_replication/feeder.cfg new file mode 100644 index 0000000000000000000000000000000000000000..99eae82a96660cef3cd319cb07c04da0679443ca --- /dev/null +++ b/test/box_replication/feeder.cfg @@ -0,0 +1,6 @@ +pid_file = "tarantool.pid" + +wal_feeder_bind_ipaddr = "127.0.0.1" +wal_feeder_bind_port = 33016 + +wal_feeder_dir = "../" diff --git a/test/box_replication/feeder_secondary.cfg b/test/box_replication/feeder_secondary.cfg new file mode 100644 index 0000000000000000000000000000000000000000..59e389be751b0212b4927e0eaf61cc8a50d00435 --- /dev/null +++ b/test/box_replication/feeder_secondary.cfg @@ -0,0 +1,6 @@ +pid_file = "tarantool.pid" + +wal_feeder_bind_ipaddr = "127.0.0.1" +wal_feeder_bind_port = 33026 + +wal_feeder_dir = "../" diff --git a/test/box_replication/feeder_slave.cfg b/test/box_replication/feeder_slave.cfg new file mode 100644 index 0000000000000000000000000000000000000000..fbe20480cb82e89927bf68baa7f4dae233fc994e --- /dev/null +++ b/test/box_replication/feeder_slave.cfg @@ -0,0 +1,6 @@ +pid_file = "tarantool.pid" + +wal_feeder_bind_ipaddr = "127.0.0.1" +wal_feeder_bind_port = 33116 + +wal_feeder_dir = "../" diff --git a/test/box_replication/suite.ini b/test/box_replication/suite.ini new file mode 100644 index 0000000000000000000000000000000000000000..830fb400442f24d3681d713b03769b882f0a7ca7 --- /dev/null +++ b/test/box_replication/suite.ini @@ -0,0 +1,3 @@ +[default] +description = tarantool/box, replication +config = tarantool.cfg diff --git a/test/box_replication/tarantool.cfg b/test/box_replication/tarantool.cfg new file mode 100644 index 0000000000000000000000000000000000000000..212ed8616732320f922c9f137faf77e583b77033 --- /dev/null +++ b/test/box_replication/tarantool.cfg @@ -0,0 +1,11 @@ +pid_file = "tarantool.pid" + +primary_port = 33013 +secondary_port = 33014 +admin_port = 33015 + +namespace[0].enabled = 1 +namespace[0].index[0].type = "HASH" +namespace[0].index[0].unique = 1 +namespace[0].index[0].key_field[0].fieldno = 0 +namespace[0].index[0].key_field[0].type = "NUM" diff --git a/test/box_replication/tarantool_beholder.cfg b/test/box_replication/tarantool_beholder.cfg new file mode 100644 index 0000000000000000000000000000000000000000..14d623b2d1e00a7880f24dd59aece90698226f97 --- /dev/null +++ b/test/box_replication/tarantool_beholder.cfg @@ -0,0 +1,11 @@ +pid_file = "tarantool.pid" + +primary_port = 33013 +secondary_port = 33024 +admin_port = 33025 + +namespace[0].enabled = 1 +namespace[0].index[0].type = "HASH" +namespace[0].index[0].unique = 1 +namespace[0].index[0].key_field[0].fieldno = 0 +namespace[0].index[0].key_field[0].type = "NUM" diff --git a/test/box_replication/tarantool_beholder_to_slave.cfg b/test/box_replication/tarantool_beholder_to_slave.cfg new file mode 100644 index 0000000000000000000000000000000000000000..e4e8a1af38c7b2d3cec83c2bc6a3e9662ffb619b --- /dev/null +++ b/test/box_replication/tarantool_beholder_to_slave.cfg @@ -0,0 +1,15 @@ +pid_file = "tarantool.pid" + +primary_port = 33013 +secondary_port = 33024 +admin_port = 33025 + +namespace[0].enabled = 1 +namespace[0].index[0].type = "HASH" +namespace[0].index[0].unique = 1 +namespace[0].index[0].key_field[0].fieldno = 0 +namespace[0].index[0].key_field[0].type = "NUM" + +remote_hot_standby = 1 +wal_feeder_ipaddr = "127.0.0.1" +wal_feeder_port = 33116 diff --git a/test/box_replication/tarantool_slave.cfg b/test/box_replication/tarantool_slave.cfg new file mode 100644 index 0000000000000000000000000000000000000000..3170af2d724c10d0a2ff02197783435c065908f2 --- /dev/null +++ b/test/box_replication/tarantool_slave.cfg @@ -0,0 +1,15 @@ +pid_file = "tarantool.pid" + +primary_port = 33113 +secondary_port = 33114 +admin_port = 33115 + +namespace[0].enabled = 1 +namespace[0].index[0].type = "HASH" +namespace[0].index[0].unique = 1 +namespace[0].index[0].key_field[0].fieldno = 0 +namespace[0].index[0].key_field[0].type = "NUM" + +remote_hot_standby = 1 +wal_feeder_ipaddr = "127.0.0.1" +wal_feeder_port = 33016 diff --git a/test/box_replication/tarantool_slave_secondary_feeder_replication.cfg b/test/box_replication/tarantool_slave_secondary_feeder_replication.cfg new file mode 100644 index 0000000000000000000000000000000000000000..0046d845c06c8e243ef2d0a7be39c0ee6ed7fac6 --- /dev/null +++ b/test/box_replication/tarantool_slave_secondary_feeder_replication.cfg @@ -0,0 +1,15 @@ +pid_file = "tarantool.pid" + +primary_port = 33113 +secondary_port = 33114 +admin_port = 33115 + +namespace[0].enabled = 1 +namespace[0].index[0].type = "HASH" +namespace[0].index[0].unique = 1 +namespace[0].index[0].key_field[0].fieldno = 0 +namespace[0].index[0].key_field[0].type = "NUM" + +remote_hot_standby = 1 +wal_feeder_ipaddr = "127.0.0.1" +wal_feeder_port = 33026 diff --git a/test/box_replication/tarantool_slave_to_master.cfg b/test/box_replication/tarantool_slave_to_master.cfg new file mode 100644 index 0000000000000000000000000000000000000000..4cc0c24f10895b32b8a660f23fcb95b8aca6130a --- /dev/null +++ b/test/box_replication/tarantool_slave_to_master.cfg @@ -0,0 +1,11 @@ +pid_file = "tarantool.pid" + +primary_port = 33113 +secondary_port = 33114 +admin_port = 33115 + +namespace[0].enabled = 1 +namespace[0].index[0].type = "HASH" +namespace[0].index[0].unique = 1 +namespace[0].index[0].key_field[0].fieldno = 0 +namespace[0].index[0].key_field[0].type = "NUM" diff --git a/test/box_replication/tarantool_to_slave.cfg b/test/box_replication/tarantool_to_slave.cfg new file mode 100644 index 0000000000000000000000000000000000000000..495fb11c357ec373c458a908c0139243e6fba14c --- /dev/null +++ b/test/box_replication/tarantool_to_slave.cfg @@ -0,0 +1,15 @@ +pid_file = "tarantool.pid" + +primary_port = 33013 +secondary_port = 33014 +admin_port = 33015 + +namespace[0].enabled = 1 +namespace[0].index[0].type = "HASH" +namespace[0].index[0].unique = 1 +namespace[0].index[0].key_field[0].fieldno = 0 +namespace[0].index[0].key_field[0].type = "NUM" + +remote_hot_standby = 1 +wal_feeder_ipaddr = "127.0.0.1" +wal_feeder_port = 33116 diff --git a/test/lib/box.py b/test/lib/box.py index 5ee043df037b76b46de86eb67ae7c4779071891d..edc3f0f90a0612113964ad501a13b4bbc2c433db 100644 --- a/test/lib/box.py +++ b/test/lib/box.py @@ -34,7 +34,7 @@ class Box(TarantoolConnection): res = res + buf return res - def execute_no_reconnect(self, command, noprint=True): + def execute_no_reconnect(self, command, silent=True): statement = sql.parse("sql", command) if statement == None: return "You have an error in your SQL syntax\n" @@ -57,5 +57,9 @@ class Box(TarantoolConnection): else: response = None + if not silent: + print command + print statement.unpack(response) + return statement.unpack(response) + "\n" diff --git a/test/lib/server.py b/test/lib/server.py index 659da68af99cb58768d4f61859fea461db2da78c..27154999e2a484ddf8c623bcb60189079ea578f5 100644 --- a/test/lib/server.py +++ b/test/lib/server.py @@ -92,12 +92,14 @@ class Server(object): return exe raise RuntimeError("Can't find server executable in " + path) - def cleanup(self): + def cleanup(self, full=False): trash = [] for re in self.re_vardir_cleanup: trash += glob.glob(os.path.join(self.vardir, re)) for filename in trash: os.remove(filename) + if full: + shutil.rmtree(self.vardir) def configure(self, config): self.config = os.path.abspath(config) diff --git a/test/lib/tarantool_admin.py b/test/lib/tarantool_admin.py index 4788002c2933f24e16659c7fcd279c2b9a1a67d7..bce37e34e437514a82535ceb9b7da9e09666e7da 100644 --- a/test/lib/tarantool_admin.py +++ b/test/lib/tarantool_admin.py @@ -29,7 +29,7 @@ from tarantool_connection import TarantoolConnection is_admin_re = re.compile("^\s*(show|save|exec|exit|reload|help)", re.I) class TarantoolAdmin(TarantoolConnection): - def execute_no_reconnect(self, command, noprint): + def execute_no_reconnect(self, command, silent): self.socket.sendall(command) bufsiz = 4096 @@ -46,7 +46,7 @@ class TarantoolAdmin(TarantoolConnection): # validate yaml by parsing it yaml.load(res) - if not noprint: + if not silent: print command.replace('\n', '') print res[:-1] return res diff --git a/test/lib/tarantool_box_server.py b/test/lib/tarantool_box_server.py index 5fde641df44cdd720dcc9b537499e609c2eb3998..04bfa7ffef64cb39e074dfc4ad9f219e036001ac 100644 --- a/test/lib/tarantool_box_server.py +++ b/test/lib/tarantool_box_server.py @@ -5,6 +5,7 @@ import ConfigParser from tarantool_server import TarantoolServer, TarantoolConfigFile from tarantool_admin import TarantoolAdmin from box import Box +import time class TarantoolBoxServer(TarantoolServer): def __new__(cls, core="tarantool", module="box"): @@ -33,3 +34,12 @@ class TarantoolBoxServer(TarantoolServer): stdout = subprocess.PIPE, stderr = subprocess.PIPE) + def wait_sync(self, lsn): + synced = 0 + while synced == 0: + synced = 1 + data = self.admin.execute("show info\n", silent=True) + info = yaml.load(data)["info"] + if (info["lsn"] != lsn): + synced = 0 + time.sleep(0.1) diff --git a/test/lib/tarantool_connection.py b/test/lib/tarantool_connection.py index 4b3c23513d61e1f037200f0f63959407495b3518..50653f8790f6865a6076afad32ec371bc3cc0fbd 100644 --- a/test/lib/tarantool_connection.py +++ b/test/lib/tarantool_connection.py @@ -64,9 +64,9 @@ class TarantoolConnection: else: self.reconnect() - def execute(self, command, noprint=True): + def execute(self, command, silent=True): self.opt_reconnect() - return self.execute_no_reconnect(command, noprint) + return self.execute_no_reconnect(command, silent) def write(self, fragment): """This is to support print >> admin, "command" syntax. diff --git a/test/lib/tarantool_server.py b/test/lib/tarantool_server.py index 41ec80f84a5a6adbb6d8c28bae36834beff2ccf8..280d80d7efcbda25072487f0db1ec1671213d556 100644 --- a/test/lib/tarantool_server.py +++ b/test/lib/tarantool_server.py @@ -46,13 +46,13 @@ class TarantoolServer(Server): config.readfp(TarantoolConfigFile(fp, dummy_section_name)) self.pidfile = config.get(dummy_section_name, "pid_file") - def reconfigure(self, config, noprint=False): + def reconfigure(self, config, silent=False): if config == None: os.unlink(os.path.join(self.vardir, self.default_config_name)) else: self.config = os.path.abspath(config) shutil.copy(self.config, os.path.join(self.vardir, self.default_config_name)) - self.admin.execute("reload configuration\n", noprint=noprint) + self.admin.execute("reload configuration\n", silent=silent) def version(self): p = subprocess.Popen([self.binary, "--version"],