diff --git a/cfg/tarantool_box_cfg.c b/cfg/tarantool_box_cfg.c index fd6e2d864a5a405cf491488b1367efdc387f8e3b..45414480216940f6f91fb035db0dc67ba7469af0 100644 --- a/cfg/tarantool_box_cfg.c +++ b/cfg/tarantool_box_cfg.c @@ -59,11 +59,10 @@ init_tarantool_cfg(tarantool_cfg *c) { c->rows_per_wal = 0; c->wal_fsync_delay = 0; c->wal_writer_inbox_size = 0; - c->local_hot_standby = 0; + c->hot_standby = 0; c->wal_dir_rescan_delay = 0; c->panic_on_snap_error = 0; c->panic_on_wal_error = 0; - c->remote_hot_standby = 0; c->replication_source_ipaddr = NULL; c->replication_source_port = 0; c->namespace = NULL; @@ -108,11 +107,10 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { c->rows_per_wal = 500000; c->wal_fsync_delay = 0; c->wal_writer_inbox_size = 128; - c->local_hot_standby = 0; + c->hot_standby = 0; c->wal_dir_rescan_delay = 0.1; c->panic_on_snap_error = 1; c->panic_on_wal_error = 0; - c->remote_hot_standby = 0; c->replication_source_ipaddr = NULL; c->replication_source_port = 0; c->namespace = NULL; @@ -245,8 +243,8 @@ static NameAtom _name__wal_fsync_delay[] = { static NameAtom _name__wal_writer_inbox_size[] = { { "wal_writer_inbox_size", -1, NULL } }; -static NameAtom _name__local_hot_standby[] = { - { "local_hot_standby", -1, NULL } +static NameAtom _name__hot_standby[] = { + { "hot_standby", -1, NULL } }; static NameAtom _name__wal_dir_rescan_delay[] = { { "wal_dir_rescan_delay", -1, NULL } @@ -257,9 +255,6 @@ static NameAtom _name__panic_on_snap_error[] = { static NameAtom _name__panic_on_wal_error[] = { { "panic_on_wal_error", -1, NULL } }; -static NameAtom _name__remote_hot_standby[] = { - { "remote_hot_standby", -1, NULL } -}; static NameAtom _name__replication_source_ipaddr[] = { { "replication_source_ipaddr", -1, NULL } }; @@ -731,7 +726,7 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_RDONLY; c->wal_writer_inbox_size = i32; } - else if ( cmpNameAtoms( opt->name, _name__local_hot_standby) ) { + else if ( cmpNameAtoms( opt->name, _name__hot_standby) ) { if (opt->paramType != numberType ) return CNF_WRONGTYPE; c->__confetti_flags &= ~CNF_FLAG_STRUCT_NOTSET; @@ -741,9 +736,9 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_WRONGINT; if ( (i32 == LONG_MIN || i32 == LONG_MAX) && errno == ERANGE) return CNF_WRONGRANGE; - if (check_rdonly && c->local_hot_standby != i32) + if (check_rdonly && c->hot_standby != i32) return CNF_RDONLY; - c->local_hot_standby = i32; + c->hot_standby = i32; } else if ( cmpNameAtoms( opt->name, _name__wal_dir_rescan_delay) ) { if (opt->paramType != numberType ) @@ -785,18 +780,6 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_RDONLY; c->panic_on_wal_error = i32; } - else if ( cmpNameAtoms( opt->name, _name__remote_hot_standby) ) { - if (opt->paramType != numberType ) - return CNF_WRONGTYPE; - c->__confetti_flags &= ~CNF_FLAG_STRUCT_NOTSET; - errno = 0; - long int i32 = strtol(opt->paramValue.numberval, NULL, 10); - if (i32 == 0 && errno == EINVAL) - return CNF_WRONGINT; - if ( (i32 == LONG_MIN || i32 == LONG_MAX) && errno == ERANGE) - return CNF_WRONGRANGE; - c->remote_hot_standby = i32; - } else if ( cmpNameAtoms( opt->name, _name__replication_source_ipaddr) ) { if (opt->paramType != stringType ) return CNF_WRONGTYPE; @@ -1125,11 +1108,10 @@ typedef enum IteratorState { S_name__rows_per_wal, S_name__wal_fsync_delay, S_name__wal_writer_inbox_size, - S_name__local_hot_standby, + S_name__hot_standby, S_name__wal_dir_rescan_delay, S_name__panic_on_snap_error, S_name__panic_on_wal_error, - S_name__remote_hot_standby, S_name__replication_source_ipaddr, S_name__replication_source_port, S_name__namespace, @@ -1499,17 +1481,17 @@ tarantool_cfg_iterator_next(tarantool_cfg_iterator_t* i, tarantool_cfg *c, char } sprintf(*v, "%"PRId32, c->wal_writer_inbox_size); snprintf(buf, PRINTBUFLEN-1, "wal_writer_inbox_size"); - i->state = S_name__local_hot_standby; + i->state = S_name__hot_standby; return buf; - case S_name__local_hot_standby: + case S_name__hot_standby: *v = malloc(32); if (*v == NULL) { free(i); out_warning(CNF_NOMEMORY, "No memory to output value"); return NULL; } - sprintf(*v, "%"PRId32, c->local_hot_standby); - snprintf(buf, PRINTBUFLEN-1, "local_hot_standby"); + sprintf(*v, "%"PRId32, c->hot_standby); + snprintf(buf, PRINTBUFLEN-1, "hot_standby"); i->state = S_name__wal_dir_rescan_delay; return buf; case S_name__wal_dir_rescan_delay: @@ -1543,17 +1525,6 @@ tarantool_cfg_iterator_next(tarantool_cfg_iterator_t* i, tarantool_cfg *c, char } sprintf(*v, "%"PRId32, c->panic_on_wal_error); snprintf(buf, PRINTBUFLEN-1, "panic_on_wal_error"); - i->state = S_name__remote_hot_standby; - return buf; - case S_name__remote_hot_standby: - *v = malloc(32); - if (*v == NULL) { - free(i); - out_warning(CNF_NOMEMORY, "No memory to output value"); - return NULL; - } - sprintf(*v, "%"PRId32, c->remote_hot_standby); - snprintf(buf, PRINTBUFLEN-1, "remote_hot_standby"); i->state = S_name__replication_source_ipaddr; return buf; case S_name__replication_source_ipaddr: @@ -1906,11 +1877,10 @@ dup_tarantool_cfg(tarantool_cfg* dst, tarantool_cfg* src) { dst->rows_per_wal = src->rows_per_wal; dst->wal_fsync_delay = src->wal_fsync_delay; dst->wal_writer_inbox_size = src->wal_writer_inbox_size; - dst->local_hot_standby = src->local_hot_standby; + dst->hot_standby = src->hot_standby; dst->wal_dir_rescan_delay = src->wal_dir_rescan_delay; dst->panic_on_snap_error = src->panic_on_snap_error; dst->panic_on_wal_error = src->panic_on_wal_error; - dst->remote_hot_standby = src->remote_hot_standby; dst->replication_source_ipaddr = src->replication_source_ipaddr == NULL ? NULL : strdup(src->replication_source_ipaddr); if (src->replication_source_ipaddr != NULL && dst->replication_source_ipaddr == NULL) return CNF_NOMEMORY; @@ -2221,8 +2191,8 @@ cmp_tarantool_cfg(tarantool_cfg* c1, tarantool_cfg* c2, int only_check_rdonly) { return diff; } - if (c1->local_hot_standby != c2->local_hot_standby) { - snprintf(diff, PRINTBUFLEN - 1, "%s", "c->local_hot_standby"); + if (c1->hot_standby != c2->hot_standby) { + snprintf(diff, PRINTBUFLEN - 1, "%s", "c->hot_standby"); return diff; } @@ -2241,13 +2211,6 @@ cmp_tarantool_cfg(tarantool_cfg* c1, tarantool_cfg* c2, int only_check_rdonly) { return diff; } - if (!only_check_rdonly) { - if (c1->remote_hot_standby != c2->remote_hot_standby) { - snprintf(diff, PRINTBUFLEN - 1, "%s", "c->remote_hot_standby"); - - return diff; - } - } if (!only_check_rdonly) { if (confetti_strcmp(c1->replication_source_ipaddr, c2->replication_source_ipaddr) != 0) { snprintf(diff, PRINTBUFLEN - 1, "%s", "c->replication_source_ipaddr"); diff --git a/cfg/tarantool_box_cfg.cfg b/cfg/tarantool_box_cfg.cfg index 7de3ad3a159a97ae9005850a63adc079437c0fd1..66e9cb1f3e277ee0b87a4336c72bebbb10bc6063 100644 --- a/cfg/tarantool_box_cfg.cfg +++ b/cfg/tarantool_box_cfg.cfg @@ -103,9 +103,9 @@ wal_fsync_delay = 0 # size of WAL writer request buffer wal_writer_inbox_size = 128 -# Local hot standby (if enabled, the server will run in local hot standby -# mode, continuously fetching WAL records from shared local directory). -local_hot_standby = 0 +# Hot standby (if enabled, the server will run in hot standby +# mode, continuously fetching WAL records from shared directory). +hot_standby = 0 # Delay, in seconds, between successive re-readings of wal_dir. # The re-scan is necessary to discover new WAL files or snapshots. @@ -117,10 +117,9 @@ wal_dir_rescan_delay = 0.1 panic_on_snap_error = 1 panic_on_wal_error = 0 -# Remote hot standby (if enabled, the server will run in hot standby mode +# Replicator mode (if enabled, the server will run in replication mode # continuously fetching WAL records from # replication_source_ipaddr:replication_source_port -remote_hot_standby = 0 replication_source_ipaddr = NULL replication_source_port = 0 namespace = [ { diff --git a/cfg/tarantool_box_cfg.h b/cfg/tarantool_box_cfg.h index 9122fb36795d0d33359ecea5f064853624f5edba..dcf0dd4dea6809844763d6f983f0606bcfb14d75 100644 --- a/cfg/tarantool_box_cfg.h +++ b/cfg/tarantool_box_cfg.h @@ -2,6 +2,7 @@ #define tarantool_cfg_CFG_H #include <stdio.h> +#include <stdbool.h> #include <sys/types.h> /* @@ -157,10 +158,10 @@ typedef struct tarantool_cfg { int32_t wal_writer_inbox_size; /* - * Local hot standby (if enabled, the server will run in local hot standby - * mode, continuously fetching WAL records from shared local directory). + * Hot standby (if enabled, the server will run in hot standby + * mode, continuously fetching WAL records from shared directory). */ - int32_t local_hot_standby; + int32_t hot_standby; /* * Delay, in seconds, between successive re-readings of wal_dir. @@ -177,11 +178,10 @@ typedef struct tarantool_cfg { int32_t panic_on_wal_error; /* - * Remote hot standby (if enabled, the server will run in hot standby mode + * Replicator mode (if enabled, the server will run in replication mode * continuously fetching WAL records from * replication_source_ipaddr:replication_source_port */ - int32_t remote_hot_standby; char* replication_source_ipaddr; int32_t replication_source_port; tarantool_cfg_namespace** namespace; diff --git a/core/fiber.m b/core/fiber.m index 02de9b510ced918c2e3cbfc56d11973a4864c6a0..c9c4546f22fef93c92d8e716186812e3bda17017 100644 --- a/core/fiber.m +++ b/core/fiber.m @@ -258,7 +258,7 @@ wait_for_child(pid_t pid) } -static void +void fiber_io_start(int events) { ev_io *io = &fiber->io; @@ -272,7 +272,7 @@ fiber_io_start(int events) /** @note: this is a cancellation point. */ -static void +void fiber_io_yield() { assert(ev_is_active(&fiber->io)); @@ -287,7 +287,7 @@ fiber_io_yield() } } -static void +void fiber_io_stop(int events) { ev_io *io = &fiber->io; @@ -1092,8 +1092,9 @@ tcp_server_handler(void *data) exit(EX_OSERR); } - if (server->on_bind != NULL) + if (server->on_bind != NULL) { server->on_bind(server->data); + } fiber_io_start(EV_READ); for (;;) { @@ -1126,68 +1127,6 @@ tcp_server_handler(void *data) say_syserror("accept"); continue; } - exit(EX_OSERR); - } - - if (set_nonblock(fiber->fd) == -1) - exit(EX_OSERR); - - memset(&sin, 0, sizeof(struct sockaddr_in)); - sin.sin_family = AF_INET; - sin.sin_port = htons(server->port); - sin.sin_addr.s_addr = INADDR_ANY; - - for (;;) { - if (bind(fiber->fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) { - if (errno == EADDRINUSE) - goto sleep_and_retry; - say_syserror("bind"); - exit(EX_OSERR); - } - - say_info("bound to UDP port %i", server->port); - break; - - sleep_and_retry: - if (!warning_said) { - say_warn("port %i is already in use, " - "will retry binding after 0.1 seconds.", server->port); - warning_said = true; - } - fiber_sleep(0.1); - } - - if (server->on_bind != NULL) - server->on_bind(server->data); - - fiber_io_start(EV_READ); - for (;;) { -#define MAXUDPPACKETLEN 128 - char buf[MAXUDPPACKETLEN]; - struct sockaddr_in addr; - socklen_t addrlen; - ssize_t sz; - - fiber_io_yield(); - - for (;;) { - addrlen = sizeof(addr); - sz = recvfrom(fiber->fd, buf, MAXUDPPACKETLEN, MSG_DONTWAIT, - (struct sockaddr *)&addr, &addrlen); - - if (sz <= 0) { - if (!(errno == EAGAIN || errno == EWOULDBLOCK)) - say_syserror("recvfrom"); - break; - } else { - if (server->handler) { - server->handler(data); - } else { - void (*f) (char *, int) = data; - f(buf, (int)sz); - } - } - } } fiber_io_stop(EV_READ); } @@ -1222,7 +1161,6 @@ create_socket(struct fiber *fiber, fiber_server_type type) int sock_domain = -1; int sock_type = -1; int sock_protocol = -1; - bool is_stream_sock = false; int one = 1; struct linger ling = { 0, 0 }; @@ -1236,13 +1174,6 @@ create_socket(struct fiber *fiber, fiber_server_type type) sock_domain = AF_INET; sock_type = SOCK_STREAM; sock_protocol = IPPROTO_TCP; - is_stream_sock = true; - break; - case udp_server: - sock_domain = AF_INET; - sock_type = SOCK_DGRAM; - sock_protocol = IPPROTO_UDP; - is_stream_sock = false; break; default: say_error("invalid socket type"); @@ -1260,13 +1191,11 @@ create_socket(struct fiber *fiber, fiber_server_type type) goto create_socket_fail; } - if (is_stream_sock) { - if (setsockopt(fiber->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)) != 0 || - setsockopt(fiber->fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0 || - setsockopt(fiber->fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) != 0) { - say_syserror("setsockopt"); - goto create_socket_fail; - } + if (setsockopt(fiber->fd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)) != 0 || + setsockopt(fiber->fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) != 0 || + setsockopt(fiber->fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)) != 0) { + say_syserror("setsockopt"); + goto create_socket_fail; } if (set_nonblock(fiber->fd) == -1) { diff --git a/core/replicator.m b/core/replicator.m index 5bd723fdf94788b2e25a915088ae2bf48ca04478..9cedcb2fa3141c40fd4f9bd948a4b8311f438083 100644 --- a/core/replicator.m +++ b/core/replicator.m @@ -48,6 +48,8 @@ struct replicator_process { bool need_waitpid; /** replicator is done */ bool is_done; + /** child process counts */ + u32 child_count; }; @@ -107,11 +109,16 @@ spawner_init(int sock); /** * Replicator spawner process main loop. - * */ static void spawner_main_loop(); +/** + * Replicator spawner shutdown. + */ +static void +spawner_shutdown(); + /** * Replicator's spawner process signals handler. * @@ -121,10 +128,10 @@ static void spawner_signals_handler(int signal); /** - * Process finished childs. + * Process waitpid childs. */ static void -spawner_process_finished_childs(); +spawner_process_wait_childs(); /** * Receive replication client socket from main process. @@ -142,6 +149,18 @@ spawner_recv_client_sock(int *client_sock); static int spawner_create_client_handler(int client_sock); +/** + * Replicator spawner shutdown: kill childs. + */ +static void +spawner_shutdown_kill_childs(); + +/** + * Replicator spawner shutdown: wait childs. + */ +static int +spawner_shutdown_wait_childs(); + /** * Initialize replicator's service process. */ @@ -154,6 +173,7 @@ client_handler_init(int client_sock); static int client_handler_send_row(struct recovery_state *r __attribute__((unused)), struct tbuf *t); + /*-----------------------------------------------------------------------------*/ /* replicatior module */ /*-----------------------------------------------------------------------------*/ @@ -264,21 +284,20 @@ acceptor_handler(void *data) struct fiber *sender = (struct fiber *) data; struct tbuf *msg; - if (fiber_serv_socket(fiber, tcp_server, cfg.replication_port, false, 0.0) != 0) { + if (fiber_serv_socket(fiber, tcp_server, cfg.replication_port, true, 0.1) != 0) { panic("can not bind replication port"); } msg = tbuf_alloc(fiber->pool); - fiber_io_start(EV_READ); for (;;) { struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); int client_sock = -1; /* wait new connection request */ + fiber_io_start(EV_READ); fiber_io_yield(); - /* accept connection */ client_sock = accept(fiber->fd, &addr, &addrlen); if (client_sock == -1) { @@ -287,6 +306,7 @@ acceptor_handler(void *data) } panic_syserror("accept"); } + fiber_io_stop(EV_READ); say_info("connection from %s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); /* put descriptor to message and send to sender fiber */ @@ -294,9 +314,7 @@ acceptor_handler(void *data) write_inbox(sender, msg); /* clean-up buffer & wait while sender fiber read message */ tbuf_reset(msg); - wait_inbox(sender); } - fiber_io_stop(EV_READ); } /** Replication sender fiber. */ @@ -361,11 +379,13 @@ sender_send_sock(int client_sock) *((int *) CMSG_DATA(control_message)) = client_sock; /* wait, when interprocess comm. socke will ready for write */ - wait_for(EV_WRITE); + fiber_io_start(EV_WRITE); + fiber_io_yield(); /* send client socket to replicator porcess */ if (sendmsg(fiber->fd, &msg, 0) < 0) { say_syserror("sendmsg"); } + fiber_io_stop(EV_WRITE); /* close client sock in the main process */ close(client_sock); } @@ -395,6 +415,7 @@ spawner_init(int sock) replicator_process.sock = sock; replicator_process.need_waitpid = false; replicator_process.is_done = false; + replicator_process.child_count = 0; /* init signals */ memset(&sa, 0, sizeof(sa)); @@ -427,7 +448,7 @@ spawner_main_loop() int client_sock; if (replicator_process.need_waitpid) { - spawner_process_finished_childs(); + spawner_process_wait_childs(); } if (spawner_recv_client_sock(&client_sock) != 0) { @@ -439,7 +460,20 @@ spawner_main_loop() } } + spawner_shutdown(); +} + +/** Replicator spawner shutdown. */ +static void +spawner_shutdown() +{ + say_info("shutdown"); + + /* kill all childs */ + spawner_shutdown_kill_childs(); + /* close socket */ close(replicator_process.sock); + exit(EXIT_SUCCESS); } @@ -458,11 +492,11 @@ static void spawner_signals_handler(int signal) } } -/** Process finished childs. */ +/** Process waitpid childs. */ static void -spawner_process_finished_childs() +spawner_process_wait_childs() { - while (true) { + while (replicator_process.child_count > 0) { int exit_status; pid_t pid; @@ -477,7 +511,8 @@ spawner_process_finished_childs() break; } - say_debug("child finished: pid = %d, exit status = %d", pid, exit_status); + say_info("child finished: pid = %d, exit status = %d", pid, WEXITSTATUS(exit_status)); + replicator_process.child_count--; } replicator_process.need_waitpid = false; @@ -539,17 +574,106 @@ spawner_create_client_handler(int client_sock) if (pid == 0) { client_handler_init(client_sock); } else { + say_info("replicator client handler spawned: pid = %d", pid); + replicator_process.child_count++; close(client_sock); } return 0; } +/** Replicator spawner shutdown: kill childs. */ +static void +spawner_shutdown_kill_childs() +{ + int result = 0; + + /* check child process count */ + if (replicator_process.child_count == 0) { + return; + } + + /* send terminate signals to childs */ + say_info("send SIGTERM to %"PRIu32" childs", replicator_process.child_count); + result = kill(0, SIGTERM); + if (result != 0) { + say_syserror("kill"); + return; + } + + /* wait when process is down */ + result = spawner_shutdown_wait_childs(); + if (result != 0) { + return; + } + + /* check child process count */ + if (replicator_process.child_count == 0) { + say_info("all childs terminated"); + return; + } + say_info("%"PRIu32" childs still alive", replicator_process.child_count); + + /* send terminate signals to childs */ + say_info("send SIGKILL to %"PRIu32" childs", replicator_process.child_count); + result = kill(0, SIGKILL); + if (result != 0) { + say_syserror("kill"); + return; + } + + /* wait when process is down */ + result = spawner_shutdown_wait_childs(); + if (result != 0) { + return; + } + say_info("all childs terminated"); +} + +/** Replicator spawner shutdown: wait childs. */ +static int +spawner_shutdown_wait_childs() +{ + const u32 wait_sec = 5; + struct timeval tv; + + say_info("wait for childs %"PRIu32" seconds", wait_sec); + + tv.tv_sec = wait_sec; + tv.tv_usec = 0; + + /* wait childs process */ + spawner_process_wait_childs(); + while (replicator_process.child_count > 0) { + int result; + + /* wait EINTR or timeout */ + result = select(0, NULL, NULL, NULL, &tv); + if (result < 0 && errno != EINTR) { + /* this is not signal */ + say_syserror("select"); + return - 1; + } + + /* wait childs process */ + spawner_process_wait_childs(); + + /* check timeout */ + if (tv.tv_sec == 0 && tv.tv_usec == 0) { + /* timeout happen */ + break; + } + } + + return 0; +} + /** Initialize replicator's service process. */ static void client_handler_init(int client_sock) { char name[sizeof(fiber->name)]; + struct sigaction sa; struct recovery_state *log_io; struct tbuf *ver; i64 lsn; @@ -558,11 +682,31 @@ client_handler_init(int client_sock) fiber->has_peer = true; fiber->fd = client_sock; + /* set replicator name */ memset(name, 0, sizeof(name)); snprintf(name, sizeof(name), "replicator%s/handler", cfg.replicator_custom_proc_title); fiber_set_name(fiber, name); set_proc_title("%s %s", name, fiber_peer_name(fiber)); + /* init signals */ + memset(&sa, 0, sizeof(sa)); + + /* ignore SIGPIPE and SIGCHLD */ + sa.sa_handler = SIG_IGN; + if ((sigaction(SIGPIPE, &sa, NULL) == -1) || + (sigaction(SIGCHLD, &sa, NULL) == -1)) { + say_syserror("sigaction"); + } + + /* return signals SIGHUP, SIGINT and SIGTERM to default value */ + sa.sa_handler = SIG_DFL; + + if ((sigaction(SIGHUP, &sa, NULL) == -1) || + (sigaction(SIGINT, &sa, NULL) == -1) || + (sigaction(SIGTERM, &sa, NULL) == -1)) { + say_syserror("sigaction"); + } + ev_default_loop(0); r = read(fiber->fd, &lsn, sizeof(lsn)); @@ -572,13 +716,14 @@ client_handler_init(int client_sock) } panic("invalid lns request size: %zu", r); } + say_info("start recover from lsn:%"PRIi64, lsn); ver = tbuf_alloc(fiber->pool); tbuf_append(ver, &default_version, sizeof(default_version)); client_handler_send_row(NULL, ver); log_io = recover_init(NULL, cfg.wal_dir, - NULL, client_handler_send_row, INT32_MAX, 0, 64, RECOVER_READONLY, false); + client_handler_send_row, INT32_MAX, 0, 64, RECOVER_READONLY, false); recover(log_io, lsn); recover_follow(log_io, 0.1); @@ -596,6 +741,11 @@ client_handler_send_row(struct recovery_state *r __attribute__((unused)), struct while (len > 0) { bytes = write(fiber->fd, data, len); if (bytes < 0) { + if (errno == EPIPE) { + /* socket closed on opposite site */ + say_info("replication socket closed on opposite side, exit"); + exit(EXIT_SUCCESS); + } panic_syserror("write"); } len -= bytes; @@ -603,7 +753,6 @@ client_handler_send_row(struct recovery_state *r __attribute__((unused)), struct } say_debug("send row: %" PRIu32 " bytes %s", t->len, tbuf_to_hex(t)); - return 0; } diff --git a/include/fiber.h b/include/fiber.h index 4a11eb4e988b1fe7d433bbc8c2d7dab492dfd44c..a8bb77d628c946fa7b77558cfee9f6c798612c90 100644 --- a/include/fiber.h +++ b/include/fiber.h @@ -129,6 +129,17 @@ void fiber_init(void); struct fiber *fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void *); void fiber_set_name(struct fiber *fiber, const char *name); void wait_for_child(pid_t pid); + +void +fiber_io_start(int events); + +void +fiber_io_yield(); + +void +fiber_io_stop(int events); + + void yield(void); void fiber_destroy_all(); diff --git a/mod/box/box.m b/mod/box/box.m index c00618428d685dc64c047ad5b7d4851d4f9827a9..bcfbfcc8b8a95faff8a844a9d3c691cbfa009591 100644 --- a/mod/box/box.m +++ b/mod/box/box.m @@ -1230,8 +1230,11 @@ static void remote_recovery_restart(struct tarantool_cfg *conf) { if (remote_recover) { + /* Another replication fiber started, stop it */ say_info("shutting down the replica"); + fiber_cancel(remote_recover); fiber_call(remote_recover); + remote_recover = NULL; } say_info("starting the replica"); @@ -1241,16 +1244,14 @@ remote_recovery_restart(struct tarantool_cfg *conf) default_remote_row_handler); status = palloc(eter_pool, 64); - snprintf(status, 64, "replica/%s:%i%s", conf->wal_feeder_ipaddr, - conf->wal_feeder_port, custom_proc_title); - title("replica/%s:%i%s", conf->wal_feeder_ipaddr, conf->wal_feeder_port, - custom_proc_title); + snprintf(status, 64, "replica/%s:%i%s", conf->replication_source_ipaddr, conf->replication_source_port, custom_proc_title); + title("replica/%s:%i%s", conf->replication_source_ipaddr, conf->replication_source_port, custom_proc_title); } static void box_master_or_slave(struct tarantool_cfg *conf) { - if (conf->remote_hot_standby) { + if (conf->replication_source_port) { rw_callback = box_process_ro; recovery_wait_lsn(recovery_state, recovery_state->lsn); @@ -1294,10 +1295,24 @@ memcached_bound_to_primary(void *data __attribute__((unused))) i32 mod_check_config(struct tarantool_cfg *conf) { - if (conf->remote_hot_standby > 0 && conf->local_hot_standby > 0) { + bool is_replica = (bool)conf->replication_source_port; + + if (is_replica) { + if (conf->replication_source_port < 0 || conf->replication_source_port > USHRT_MAX) { + out_warning(0, "invalid replication source port value: %"PRId32, + conf->replication_source_port); + } + + if (conf->replication_source_ipaddr == 0) { + out_warning(0, "replication source ip address not defined"); + return -1; + } + + } + + if (is_replica && conf->hot_standby) { out_warning(0, "Remote and local hot standby modes " "can't be enabled simultaneously"); - return -1; } @@ -1307,25 +1322,22 @@ mod_check_config(struct tarantool_cfg *conf) i32 mod_reload_config(struct tarantool_cfg *old_conf, struct tarantool_cfg *new_conf) { - if (old_conf->remote_hot_standby != new_conf->remote_hot_standby) { - if (recovery_state->finalize != true) { - out_warning(0, "Could not propagate %s before local recovery finished", - old_conf->remote_hot_standby == true ? "slave to master" : - "master to slave"); + bool old_is_replica = (bool)old_conf->replication_source_port; + bool new_is_replica = (bool)new_conf->replication_source_port; + if (old_is_replica != new_is_replica) { + if (!recovery_state->finalize) { + out_warning(0, "Could not propagate %s before local recovery finished", + old_is_replica ? "slave to master" : "master to slave"); return -1; } - } - - if (old_conf->remote_hot_standby != new_conf->remote_hot_standby) { - /* Local recovery must be finalized at this point */ - assert(recovery_state->finalize == true); box_master_or_slave(new_conf); - } else if (old_conf->remote_hot_standby > 0 && - (strcmp(old_conf->wal_feeder_ipaddr, new_conf->wal_feeder_ipaddr) != 0 || - old_conf->wal_feeder_port != new_conf->wal_feeder_port)) + } else if (old_is_replica > 0 && + (strcmp(old_conf->replication_source_ipaddr, new_conf->replication_source_ipaddr) != 0 || + old_conf->replication_source_port != new_conf->replication_source_port)) { remote_recovery_restart(new_conf); + } return 0; } @@ -1355,7 +1367,7 @@ mod_init(void) if (cfg.memcached != 0) { if (cfg.secondary_port != 0) panic("in memcached mode secondary_port must be 0"); - if (cfg.remote_hot_standby) + if (cfg.replication_source_port) panic("remote replication is not supported in memcached mode."); memcached_init(); @@ -1363,9 +1375,9 @@ mod_init(void) title("loading"); - if (cfg.remote_hot_standby) { + if (cfg.replication_source_port) { if (cfg.replication_source_ipaddr == NULL || cfg.replication_source_port == 0) - panic("replication_source_ipaddr & replication_source_port must be provided in remote_hot_standby mode"); + panic("replication_source_ipaddr & replication_source_port must be provided in replication mode"); } recovery_state = recover_init(cfg.snap_dir, cfg.wal_dir, @@ -1420,7 +1432,7 @@ mod_init(void) title("orphan"); - if (cfg.local_hot_standby) { + if (cfg.hot_standby) { say_info("starting local hot standby"); recover_follow(recovery_state, cfg.wal_dir_rescan_delay); status = "hot_standby"; diff --git a/mod/box/box_cfg.cfg_tmpl b/mod/box/box_cfg.cfg_tmpl index ecd3b612917367ec9b0d399f19076389f8834803..f4269f714146dc855740651aabee57a2cd00477b 100644 --- a/mod/box/box_cfg.cfg_tmpl +++ b/mod/box/box_cfg.cfg_tmpl @@ -43,9 +43,9 @@ wal_fsync_delay=0, ro # size of WAL writer request buffer wal_writer_inbox_size=128, ro -# Local hot standby (if enabled, the server will run in local hot standby -# mode, continuously fetching WAL records from shared local directory). -local_hot_standby=0, ro +# Hot standby (if enabled, the server will run in hot standby +# mode, continuously fetching WAL records from shared directory). +hot_standby=0, ro # Delay, in seconds, between successive re-readings of wal_dir. # The re-scan is necessary to discover new WAL files or snapshots. wal_dir_rescan_delay=0.1, ro @@ -57,10 +57,9 @@ wal_dir_rescan_delay=0.1, ro panic_on_snap_error=1, ro panic_on_wal_error=0, ro -# Remote hot standby (if enabled, the server will run in hot standby mode +# Replicator mode (if enabled, the server will run in replication mode # continuously fetching WAL records from # replication_source_ipaddr:replication_source_port -remote_hot_standby=0 replication_source_ipaddr=NULL replication_source_port=0 diff --git a/test/box/show.result b/test/box/show.result index 1fd1274a317e44164ea156a9d2bd6b9b5d324bba..ed0809771b98e4f5cc506e21b19771d31287b17f 100644 --- a/test/box/show.result +++ b/test/box/show.result @@ -57,11 +57,10 @@ configuration: rows_per_wal: "50" wal_fsync_delay: "0" wal_writer_inbox_size: "128" - local_hot_standby: "0" + hot_standby: "0" wal_dir_rescan_delay: "0.1" panic_on_snap_error: "1" panic_on_wal_error: "0" - remote_hot_standby: "0" replication_source_ipaddr: (null) replication_source_port: "0" namespace[0].enabled: "1" diff --git a/test/box_replication/common.test b/test/box_replication/common.test index 1385f57f3d81c17825020e3274736c42158886ac..472d46180d141744394032cc1fb68d9e04489b3c 100644 --- a/test/box_replication/common.test +++ b/test/box_replication/common.test @@ -58,7 +58,7 @@ for i in range(1, 10): beholder.sql.execute("select * from t0 where k0 = {0}".format(i), silent=False) -slave.wait_sync(10) +slave.wait_lsn(10) print """ #