diff --git a/src/box/box.cc b/src/box/box.cc index 98faad921d00d4ee82f7ac3c827ea4cab2158e80..15cba5606dfc0cb85c2322a50476cb6f63d28ad2 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -433,7 +433,6 @@ box_init() { box_check_config(); - replication_prefork(cfg_gets("snap_dir"), cfg_gets("wal_dir")); stat_init(); stat_base = stat_register(iproto_type_strs, IPROTO_TYPE_STAT_MAX); diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 1a431db157fc7fe93fe7670e6048869ca5d06a52..39cf403471dbce4a3fcc7a35972814387db73dc8 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -150,9 +150,6 @@ wal_writer_start(struct recovery_state *state, int rows_per_wal); void wal_writer_stop(struct recovery_state *r); -static void -recovery_stop_local(struct recovery_state *r); - /** * Throws an exception in case of error. */ @@ -598,7 +595,7 @@ recovery_follow_local(struct recovery_state *r, fiber_start(r->watcher, r, wal_dir_rescan_delay); } -static void +void recovery_stop_local(struct recovery_state *r) { if (r->watcher) { diff --git a/src/box/recovery.h b/src/box/recovery.h index 3a0af00a4f1ef36eb753fe29db39151b45f8519e..9ccc852e9f712c0b2546363d7404c29d6ff1047f 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -65,13 +65,8 @@ extern const char *wal_mode_STRS[]; struct relay { /** Replica connection */ int sock; - /* Request type - SUBSCRIBE or JOIN */ - uint32_t type; /* Request sync */ uint64_t sync; - /* Only used in SUBSCRIBE request */ - uint32_t server_id; - struct vclock vclock; }; enum { REMOTE_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ @@ -149,6 +144,8 @@ recovery_has_data(struct recovery_state *r) void recovery_bootstrap(struct recovery_state *r); void recover_snap(struct recovery_state *r); void recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay); +void recovery_stop_local(struct recovery_state *r); + void recovery_finalize(struct recovery_state *r, int rows_per_wal); int64_t wal_write(struct recovery_state *r, struct xrow_header *packet); diff --git a/src/box/replication.cc b/src/box/replication.cc index 66ec9bfa6b43d38474b4c8e13e7aee439063cecb..49061b9c9eee119541c5bfe656ac7a22b2513600 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -29,19 +29,7 @@ #include "replication.h" #include <say.h> #include <fiber.h> -#include <stddef.h> -#include <stddef.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/wait.h> -#include <sys/uio.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <limits.h> -#include <fcntl.h> -#include "tarantool.h" -#include "fiber.h" #include "recovery.h" #include "xlog.h" #include "evio.h" @@ -52,154 +40,13 @@ #include "scoped_guard.h" #include "xrow.h" #include "coeio.h" +#include "coio.h" +#include "cfg.h" -/** Replication topology - * ---------------------- - * - * Tarantool replication consists of 3 interacting processes: - * master, spawner and replication relay. - * - * The spawner is created at server start, and master communicates - * with the spawner using a socketpair(2). Replication relays are - * created by the spawner and handle one client connection each. - * - * The master process binds to the primary port and accepts - * incoming connections. This is done in the master to be able to - * correctly handle authentication of replication clients. - * - * Once a client socket is accepted, it is sent to the spawner - * process, through the master's end of the socket pair. - * - * The spawner listens on the receiving end of the socket pair and - * for every received socket creates a replication relay, which is - * then responsible for sending write ahead logs to the replica. - * - * Upon shutdown, the master closes its end of the socket pair. - * The spawner then reads EOF from its end, terminates all - * children and exits. - */ -static int master_to_spawner_socket; -static char cfg_wal_dir[PATH_MAX]; -static char cfg_snap_dir[PATH_MAX]; - -/** Send a file descriptor to replication relay spawner. - * - * Invoked when spawner's end of the socketpair becomes ready. - */ static void -replication_send_socket(ev_loop *loop, ev_io *watcher, int /* events */); -static void -replication_relay_send_row(struct recovery_state *r, void * /* param */, +replication_send_row(struct recovery_state *r, void * /* param */, struct xrow_header *packet); -/** Replication spawner process */ -static struct spawner { - /** reading end of the socket pair with the master */ - int sock; - /** non-zero if got a terminating signal */ - sig_atomic_t killed; - /** child process count */ - sig_atomic_t child_count; -} spawner; - -/** Initialize spawner process. - * - * @param sock the socket between the main process and the spawner. - */ -static void -spawner_init(int sock); - -/** Spawner main loop. */ -static void -spawner_main_loop(); - -/** Shutdown spawner and all its children. */ -static void -spawner_shutdown(); - -/** Handle SIGINT, SIGTERM, SIGHUP. */ -static void -spawner_signal_handler(int signal); - -/** Handle SIGCHLD: collect status of a terminated child. */ -static void -spawner_sigchld_handler(int signal __attribute__((unused))); - -/** Create a replication relay. - * - * @return 0 on success, -1 on error - */ -static int -spawner_create_replication_relay(struct relay *relay); - -/** Shut down all relays when shutting down the spawner. */ -static void -spawner_shutdown_children(); - -/** Initialize replication relay process. */ -static void -replication_relay_loop(struct relay *relay); - -/* - * ------------------------------------------------------------------------ - * replication module - * ------------------------------------------------------------------------ - */ - -/** Pre-fork replication spawner process. */ -void -replication_prefork(const char *snap_dir, const char *wal_dir) -{ - snprintf(cfg_snap_dir, sizeof(cfg_snap_dir), "%s", snap_dir); - snprintf(cfg_wal_dir, sizeof(cfg_wal_dir), "%s", wal_dir); - int sockpair[2]; - /* - * Create UNIX sockets to communicate between the main and - * spawner processes. - */ - if (socketpair(PF_LOCAL, SOCK_STREAM, 0, sockpair) != 0) - panic_syserror("socketpair"); - assert(sockpair[0] != STDOUT_FILENO && sockpair[0] != STDERR_FILENO); - - /* flush buffers to avoid multiple output */ - /* https://github.com/tarantool/tarantool/issues/366 */ - fflush(stdout); - fflush(stderr); - /* create spawner */ - pid_t pid = fork(); - if (pid == -1) - panic_syserror("fork"); - - if (pid != 0) { - /* parent process: tarantool */ - close(sockpair[1]); - master_to_spawner_socket = sockpair[0]; - sio_setfl(master_to_spawner_socket, O_NONBLOCK, 1); - } else { - ev_loop_fork(loop()); - ev_run(loop(), EVRUN_NOWAIT); - /* child process: spawner */ - close(sockpair[0]); - /* - * Move to an own process group, to not receive - * signals from the controlling tty. - */ - setpgid(0, 0); - spawner_init(sockpair[1]); - } -} - -/*-----------------------------------------------------------------------------*/ -/* replication accept/sender fibers */ -/*-----------------------------------------------------------------------------*/ - -/** State of a replication request - master process. */ -struct replication_request { - struct ev_io io; - int fd; - struct relay data; -}; - /** Replication acceptor fiber handler. */ static void * replication_join_thread(void *arg) @@ -233,15 +80,13 @@ void replication_join(int fd, struct xrow_header *packet) { struct recovery_state *r; - r = recovery_new(cfg_snap_dir, cfg_wal_dir, - replication_relay_send_row, NULL); + r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), + replication_send_row, NULL); auto recovery_guard = make_scoped_guard([&]{ recovery_delete(r); }); r->relay.sock = fd; r->relay.sync = packet->sync; - r->relay.server_id = packet->server_id; - r->relay.type = IPROTO_JOIN; char name[FIBER_NAME_MAX]; struct sockaddr_storage peer; @@ -255,419 +100,139 @@ replication_join(int fd, struct xrow_header *packet) cord_cojoin(&cord); } -/** Replication acceptor fiber handler. */ -void -replication_subscribe(int fd, struct xrow_header *packet) -{ - struct tt_uuid uu = uuid_nil, server_uuid = uuid_nil; - - struct vclock vclock; - vclock_create(&vclock); - xrow_decode_subscribe(packet, &uu, &server_uuid, &vclock); - - /** - * Check that the given UUID matches the UUID of the - * cluster this server belongs to. Used to handshake - * replica connect, and refuse a connection from a replica - * which belongs to a different cluster. - */ - if (!tt_uuid_is_equal(&uu, &cluster_id)) { - tnt_raise(ClientError, ER_CLUSTER_ID_MISMATCH, - tt_uuid_str(&uu), tt_uuid_str(&cluster_id)); - } - - /* Check server uuid */ - uint32_t server_id; - server_id = schema_find_id(SC_CLUSTER_ID, 1, - tt_uuid_str(&server_uuid), UUID_STR_LEN); - if (server_id == SC_ID_NIL) { - tnt_raise(ClientError, ER_UNKNOWN_SERVER, - tt_uuid_str(&server_uuid)); - } - - struct replication_request *request = (struct replication_request *) - calloc(1, sizeof(*request)); - if (request == NULL) { - tnt_raise(ClientError, ER_MEMORY_ISSUE, sizeof(*request), - "iproto", "SUBSCRIBE"); - } - vclock_create(&request->data.vclock); - vclock_copy(&request->data.vclock, &vclock); - - request->fd = fd; - request->io.data = request; - request->data.type = packet->type; - request->data.sync = packet->sync; - request->data.server_id = server_id; - - ev_io_init(&request->io, replication_send_socket, - master_to_spawner_socket, EV_WRITE); - ev_io_start(loop(), &request->io); -} - - -/** Send a file descriptor to the spawner. */ -static void -replication_send_socket(ev_loop *loop, ev_io *watcher, int /* events */) -{ - struct replication_request *request = - (struct replication_request *) watcher->data; - struct msghdr msg; - struct iovec iov[2]; - char control_buf[CMSG_SPACE(sizeof(int))]; - memset(control_buf, 0, sizeof(control_buf)); /* valgrind */ - struct cmsghdr *control_message = NULL; - - size_t len = sizeof(request->data); - iov[0].iov_base = &len; - iov[0].iov_len = sizeof(len); - iov[1].iov_base = &request->data; - iov[1].iov_len = len; - - memset(&msg, 0, sizeof(msg)); - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = nelem(iov); - msg.msg_control = control_buf; - msg.msg_controllen = sizeof(control_buf); - - control_message = CMSG_FIRSTHDR(&msg); - control_message->cmsg_len = CMSG_LEN(sizeof(int)); - control_message->cmsg_level = SOL_SOCKET; - control_message->cmsg_type = SCM_RIGHTS; - *((int *) CMSG_DATA(control_message)) = request->fd; - - /* Send the client socket to the spawner. */ - if (sendmsg(master_to_spawner_socket, &msg, 0) < 0) - say_syserror("sendmsg"); - - ev_io_stop(loop, watcher); - /* Close client socket in the main process. */ - close(request->fd); - free(request); -} - - -/*--------------------------------------------------------------------------* - * spawner process * - * -------------------------------------------------------------------------*/ - -/** Initialize the spawner. */ - +/** + * A libev callback invoked when a relay client socket is ready + * for read. This currently only happens when the client closes + * its socket, and we get an EOF. + */ static void -spawner_init(int sock) +replication_subscribe_f(va_list ap) { - struct sigaction sa; - - title("spawner", NULL); - fiber_set_name(fiber(), status); + struct recovery_state *r = va_arg(ap, struct recovery_state *); - /* init replicator process context */ - spawner.sock = sock; - - /* init signals */ - memset(&sa, 0, sizeof(sa)); - sigemptyset(&sa.sa_mask); - - /* - * The spawner normally does not receive any signals, - * except when sent by a system administrator. - * When the master process terminates, it closes its end - * of the socket pair and this signals to the spawner that - * it's time to die as well. But before exiting, the - * spawner must kill and collect all active replication - * relays. This is why we need to change the default - * signal action here. - */ - sa.sa_handler = spawner_signal_handler; - - if (sigaction(SIGHUP, &sa, NULL) == -1 || - sigaction(SIGINT, &sa, NULL) == -1 || - sigaction(SIGTERM, &sa, NULL) == -1) - say_syserror("sigaction"); - - sa.sa_handler = spawner_sigchld_handler; - - if (sigaction(SIGCHLD, &sa, NULL) == -1) - say_syserror("sigaction"); - - sa.sa_handler = SIG_IGN; + recovery_follow_local(r, 0.1); /* - * Ignore SIGUSR1, SIGUSR1 is used to make snapshots, - * and if someone wrote a faulty regexp for `ps' and - * fed it to `kill' the replication shouldn't die. - * Ignore SIGUSR2 as well, since one can be pretty - * inventive in ways of shooting oneself in the foot. - * Ignore SIGPIPE, otherwise we may receive SIGPIPE - * when trying to write to the log. + * Init a read event: when replica closes its end + * of the socket, we can read EOF and shutdown the + * relay. */ - if (sigaction(SIGUSR1, &sa, NULL) == -1 || - sigaction(SIGUSR2, &sa, NULL) == -1 || - sigaction(SIGPIPE, &sa, NULL) == -1) { - - say_syserror("sigaction"); - } - - say_crit("initialized"); - spawner_main_loop(); -} - -static int -spawner_unpack_cmsg(struct msghdr *msg) -{ - struct cmsghdr *control_message; - for (control_message = CMSG_FIRSTHDR(msg); - control_message != NULL; - control_message = CMSG_NXTHDR(msg, control_message)) - if ((control_message->cmsg_level == SOL_SOCKET) && - (control_message->cmsg_type == SCM_RIGHTS)) - return *((int *) CMSG_DATA(control_message)); - assert(false); - return -1; -} - -/** Replication spawner process main loop. */ -static void -spawner_main_loop() -{ - struct msghdr msg; - struct iovec iov; - char control_buf[CMSG_SPACE(sizeof(int))]; - - size_t len; - iov.iov_base = &len; - iov.iov_len = sizeof(len); + struct ev_io sock_read_ev; + sock_read_ev.data = fiber(); + ev_io_init(&sock_read_ev, (ev_io_cb) fiber_schedule, + r->relay.sock, EV_READ); - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_control = control_buf; - msg.msg_controllen = sizeof(control_buf); + while (true) { + ev_io_start(loop(), &sock_read_ev); + fiber_yield(); + ev_io_stop(loop(), &sock_read_ev); - while (!spawner.killed) { - ssize_t msglen = recvmsg(spawner.sock, &msg, 0); - if (msglen == 0) { /* orderly master shutdown */ - say_info("Exiting: master shutdown"); - break; - } else if (msglen == -1) { - if (errno == EINTR) - continue; - say_syserror("recvmsg"); - /* continue, the error may be temporary */ - break; - } + uint8_t data; + int rc = recv(r->relay.sock, &data, sizeof(data), 0); - struct relay relay; - int sock = spawner_unpack_cmsg(&msg); - msglen = read(spawner.sock, &relay, len); - relay.sock = sock; - if (msglen == 0) { /* orderly master shutdown */ - say_info("Exiting: master shutdown"); - break; - } else if (msglen == -1) { - if (errno == EINTR) - continue; - say_syserror("recvmsg"); - /* continue, the error may be temporary */ - break; + if (rc == 0 || (rc < 0 && errno == ECONNRESET)) { + say_info("the replica has closed its socket, exiting"); + goto end; } - assert(msglen == sizeof(relay)); - spawner_create_replication_relay(&relay); + if (rc < 0) + say_syserror("recv"); } - spawner_shutdown(); +end: + recovery_stop_local(r); + ev_break(loop(), EVBREAK_ALL); } -/** Replication spawner shutdown. */ -static void -spawner_shutdown() -{ - /* - * There is no need to ever use signals with the spawner - * process. If someone did send spawner a signal by - * mistake, at least make a squeak in the error log before - * dying. - */ - if (spawner.killed) - say_info("Terminated by signal %d", (int) spawner.killed); - - /* close socket */ - close(spawner.sock); - - /* kill all children */ - spawner_shutdown_children(); - - exit(EXIT_SUCCESS); -} - -/** Replication spawner signal handler for terminating signals. */ -static void spawner_signal_handler(int signal) -{ - spawner.killed = signal; -} - -/** Wait for a terminated child. */ -static void -spawner_sigchld_handler(int signo __attribute__((unused))) -{ - static const char waitpid_failed[] = "spawner: waitpid() failed\n"; - do { - int exit_status; - pid_t pid = waitpid(-1, &exit_status, WNOHANG); - switch (pid) { - case -1: - if (errno != ECHILD) { - int r = write(STDERR_FILENO, waitpid_failed, - sizeof(waitpid_failed) - 1); - (void) r; /* -Wunused-result warning suppression */ - } - return; - case 0: /* no more changes in children status */ - return; - default: - spawner.child_count--; - } - } while (spawner.child_count > 0); -} - -/** Create replication client handler process. */ -static int -spawner_create_replication_relay(struct relay *relay) +static void * +replication_subscribe_thread(void *arg) { - /* flush buffers to avoid multiple output */ - /* https://github.com/tarantool/tarantool/issues/366 */ - fflush(stdout); - fflush(stderr); - pid_t pid = fork(); + struct recovery_state *r = (struct recovery_state *) arg; - if (pid < 0) { - say_syserror("fork"); - return -1; - } + /* Turn off the non-blocking mode, if any. */ + int nonblock = sio_getfl(r->relay.sock) & O_NONBLOCK; + sio_setfl(r->relay.sock, O_NONBLOCK, 0); + auto socket_guard = make_scoped_guard([=]{ + /* Restore non-blocking mode */ + sio_setfl(r->relay.sock, O_NONBLOCK, nonblock); + }); - if (pid == 0) { - ev_loop_fork(loop()); - ev_run(loop(), EVRUN_NOWAIT); - close(spawner.sock); - replication_relay_loop(relay); - } else { - spawner.child_count++; - close(relay->sock); - say_info("created a replication relay: pid = %d", (int) pid); - } + struct fiber *f = fiber_new("subscribe", replication_subscribe_f); + fiber_start(f, r); + ev_run(loop(), 0); + say_crit("exiting the relay loop"); return 0; } -/** Replicator spawner shutdown: kill and wait for children. */ -static void -spawner_shutdown_children() +/** Replication acceptor fiber handler. */ +void +replication_subscribe(int fd, struct xrow_header *packet) { - int kill_signo = SIGTERM, signo; - sigset_t mask, orig_mask, alarm_mask; - -retry: - sigemptyset(&mask); - sigaddset(&mask, SIGCHLD); - sigaddset(&mask, SIGALRM); - /* - * We're going to kill the entire process group, which - * we're part of. Handle the signal sent to ourselves. - */ - sigaddset(&mask, kill_signo); - - if (spawner.child_count == 0) - return; - - /* Block SIGCHLD and SIGALRM to avoid races. */ - if (sigprocmask(SIG_BLOCK, &mask, &orig_mask)) { - say_syserror("sigprocmask"); - return; - } - - /* We'll wait for children no longer than 5 sec. */ - alarm(5); - - say_info("sending signal %d to %d children", kill_signo, - (int) spawner.child_count); - - kill(0, kill_signo); + struct recovery_state *r; + r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), + replication_send_row, NULL); - say_info("waiting for children for up to 5 seconds"); + auto recovery_guard = make_scoped_guard([&]{ + recovery_delete(r); + }); - while (spawner.child_count > 0) { - sigwait(&mask, &signo); - if (signo == SIGALRM) { /* timed out */ - break; - } - else if (signo != kill_signo) { - assert(signo == SIGCHLD); - spawner_sigchld_handler(signo); - } - } + r->relay.sock = fd; + r->relay.sync = packet->sync; - /* Reset the alarm. */ - alarm(0); + struct tt_uuid uu = uuid_nil, server_uuid = uuid_nil; - /* Clear possibly pending SIGALRM. */ - sigpending(&alarm_mask); - if (sigismember(&alarm_mask, SIGALRM)) { - sigemptyset(&alarm_mask); - sigaddset(&alarm_mask, SIGALRM); - sigwait(&alarm_mask, &signo); - } + xrow_decode_subscribe(packet, &uu, &server_uuid, &r->vclock); - /* Restore the old mask. */ - if (sigprocmask(SIG_SETMASK, &orig_mask, NULL)) { - say_syserror("sigprocmask"); - return; + /** + * Check that the given UUID matches the UUID of the + * cluster this server belongs to. Used to handshake + * replica connect, and refuse a connection from a replica + * which belongs to a different cluster. + */ + if (!tt_uuid_is_equal(&uu, &cluster_id)) { + tnt_raise(ClientError, ER_CLUSTER_ID_MISMATCH, + tt_uuid_str(&uu), tt_uuid_str(&cluster_id)); } - if (kill_signo == SIGTERM) { - kill_signo = SIGKILL; - goto retry; + /* Check server uuid */ + r->server_id = schema_find_id(SC_CLUSTER_ID, 1, + tt_uuid_str(&server_uuid), UUID_STR_LEN); + if (r->server_id == SC_ID_NIL) { + tnt_raise(ClientError, ER_UNKNOWN_SERVER, + tt_uuid_str(&server_uuid)); } -} - -/** A libev callback invoked when a relay client socket is ready - * for read. This currently only happens when the client closes - * its socket, and we get an EOF. - */ -static void -replication_relay_recv(ev_loop * /* loop */, struct ev_io *w, int __attribute__((unused)) revents) -{ - int replica_sock = (int) (intptr_t) w->data; - uint8_t data; - - int rc = recv(replica_sock, &data, sizeof(data), 0); - if (rc == 0 || (rc < 0 && errno == ECONNRESET)) { - say_info("the replica has closed its socket, exiting"); - exit(EXIT_SUCCESS); - } - if (rc < 0) - say_syserror("recv"); + char name[FIBER_NAME_MAX]; + struct sockaddr_storage peer; + socklen_t addrlen = sizeof(peer); + getpeername(r->relay.sock, ((struct sockaddr*)&peer), &addrlen); + snprintf(name, sizeof(name), "relay/%s", + sio_strfaddr((struct sockaddr *)&peer, addrlen)); - exit(EXIT_FAILURE); + struct cord cord; + cord_start(&cord, name, replication_subscribe_thread, r); + cord_cojoin(&cord); } /** Send a single row to the client. */ static void -replication_relay_send_row(struct recovery_state *r, void * /* param */, +replication_send_row(struct recovery_state *r, void * /* param */, struct xrow_header *packet) { assert(iproto_type_is_dml(packet->type)); - /* Don't duplicate data */ + /* + * If packet->server_id == 0 this is a snapshot packet. + * (JOIN request). In this case, send every row. + * Otherwise, we're feeding a WAL, thus responding to + * SUBSCRIBE request. In that case, only send a row if + * it not from the same server (i.e. don't send + * replica's own rows back). + */ if (packet->server_id == 0 || packet->server_id != r->server_id) { packet->sync = r->relay.sync; struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec(packet, iov); sio_writev_all(r->relay.sock, iov, iovcnt); } - /* * Update local vclock. During normal operation wal_write() * updates local vclock. In relay mode we have to update @@ -675,90 +240,3 @@ replication_relay_send_row(struct recovery_state *r, void * /* param */, */ vclock_follow(&r->vclock, packet->server_id, packet->lsn); } - -static void -replication_relay_subscribe(struct recovery_state *r) -{ - /* Set LSNs */ - vclock_copy(&r->vclock, &r->relay.vclock); - /* Set server_id */ - r->server_id = r->relay.server_id; - - recovery_follow_local(r, 0.1); -} - -/** The main loop of replication client service process. */ -static void -replication_relay_loop(struct relay *relay) -{ - struct sigaction sa; - - /* Set process title and fiber name. - * Even though we use only the main fiber, the logger - * uses the current fiber name. - */ - struct sockaddr_storage peer; - socklen_t addrlen = sizeof(peer); - getpeername(relay->sock, ((struct sockaddr*)&peer), &addrlen); - title("relay", "%s", sio_strfaddr((struct sockaddr *)&peer, addrlen)); - fiber_set_name(fiber(), status); - - /* init signals */ - memset(&sa, 0, sizeof(sa)); - sigemptyset(&sa.sa_mask); - - /* Reset all signals to their defaults. */ - sa.sa_handler = SIG_DFL; - if (sigaction(SIGCHLD, &sa, NULL) == -1 || - sigaction(SIGHUP, &sa, NULL) == -1 || - sigaction(SIGINT, &sa, NULL) == -1 || - sigaction(SIGTERM, &sa, NULL) == -1) - say_syserror("sigaction"); - - /* - * Ignore SIGPIPE, we already handle EPIPE. - * Ignore SIGUSR1, SIGUSR1 is used to make snapshots, - * and if someone wrote a faulty regexp for `ps' and - * fed it to `kill' the replication shouldn't die. - * Ignore SIGUSR2 as well, since one can be pretty - * inventive in ways of shooting oneself in the foot. - */ - sa.sa_handler = SIG_IGN; - if (sigaction(SIGPIPE, &sa, NULL) == -1 || - sigaction(SIGUSR1, &sa, NULL) == -1 || - sigaction(SIGUSR2, &sa, NULL) == -1) { - - say_syserror("sigaction"); - } - - /* - * Init a read event: when replica closes its end - * of the socket, we can read EOF and shutdown the - * relay. - */ - struct ev_io sock_read_ev; - sock_read_ev.data = (void *)(intptr_t) relay->sock; - ev_io_init(&sock_read_ev, replication_relay_recv, - relay->sock, EV_READ); - ev_io_start(loop(), &sock_read_ev); - /** Turn off the non-blocking mode,if any. */ - sio_setfl(relay->sock, O_NONBLOCK, 0); - - /* Initialize the recovery process */ - struct recovery_state *r = NULL; - try { - r = recovery_new(cfg_snap_dir, cfg_wal_dir, - replication_relay_send_row, NULL); - r->relay = *relay; /* copy relay state to recovery */ - - assert(r->relay.type == IPROTO_SUBSCRIBE); - replication_relay_subscribe(r); - } catch (Exception *e) { - say_error("relay error: %s", e->errmsg()); - if (r) - recovery_delete(r); - exit(EXIT_FAILURE); - } - /** Return control back to the sched. */ - fiber_yield(); -} diff --git a/src/box/replication.h b/src/box/replication.h index 3798b2a0335390b067293ab7ecc4c83a2e16c88b..d46fa04117b629e342eaaa978aad8b2f118bcf5f 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -30,14 +30,6 @@ */ struct xrow_header; -/** - * Pre-fork replication spawner process. - * - * @return None. Panics and exits on error. - */ -void -replication_prefork(const char *snap_dir, const char *wal_dir); - void replication_join(int fd, struct xrow_header *packet); diff --git a/src/coio.cc b/src/coio.cc index 3acd6a9183329c0c9504c504567ddfb72fc426dd..04241c5a906d46432e1ada6d31e68d891b3c6ca1 100644 --- a/src/coio.cc +++ b/src/coio.cc @@ -45,7 +45,6 @@ struct CoioGuard { ~CoioGuard() { ev_io_stop(loop(), ev_io); } }; -typedef void (*ev_io_cb)(ev_loop *, ev_io *, int); typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int); /** Note: this function does not throw */ diff --git a/src/coio.h b/src/coio.h index 417f05911f0bc785c41c60ede6fa5ac70644d18c..5f483c8ac175209f91fb0119b7700a4b85e5b9d0 100644 --- a/src/coio.h +++ b/src/coio.h @@ -44,6 +44,8 @@ struct coio_service void *handler_param; }; +typedef void (*ev_io_cb)(ev_loop *, ev_io *, int); + int coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, socklen_t *addr_len, ev_tstamp timeout); diff --git a/src/coro.cc b/src/coro.cc index eb3aebaafc27111563bdfd9616daf5c836737e9d..0badc18578f326eecf853e039b4f60e64d22c945 100644 --- a/src/coro.cc +++ b/src/coro.cc @@ -39,6 +39,7 @@ void tarantool_coro_create(struct tarantool_coro *coro, + struct slab_cache *slabc, void (*f) (void *), void *data) { const int page = sysconf(_SC_PAGESIZE); @@ -47,7 +48,7 @@ tarantool_coro_create(struct tarantool_coro *coro, /* TODO: guard pages */ coro->stack_size = page * 16 - slab_sizeof(); - coro->stack = (char *) slab_get(&cord()->slabc, coro->stack_size) + coro->stack = (char *) slab_get(slabc, coro->stack_size) + slab_sizeof(); if (coro->stack == NULL) { @@ -62,10 +63,10 @@ tarantool_coro_create(struct tarantool_coro *coro, } void -tarantool_coro_destroy(struct tarantool_coro *coro) +tarantool_coro_destroy(struct tarantool_coro *coro, struct slab_cache *slabc) { if (coro->stack != NULL) { - slab_put(&cord()->slabc, (struct slab *) + slab_put(slabc, (struct slab *) ((char *) coro->stack - slab_sizeof())); } } diff --git a/src/coro.h b/src/coro.h index 273e773790040f6ab45255a3a8deaea2d5788f29..a7113449b3cbec9b5bc14b301930e72bbc9b6332 100644 --- a/src/coro.h +++ b/src/coro.h @@ -40,8 +40,10 @@ struct tarantool_coro { void tarantool_coro_create(struct tarantool_coro *ctx, + struct slab_cache *cache, void (*f) (void *), void *data); void -tarantool_coro_destroy(struct tarantool_coro *ctx); +tarantool_coro_destroy(struct tarantool_coro *ctx, + struct slab_cache *cache); #endif /* TARANTOOL_CORO_H_INCLUDED */ diff --git a/src/fiber.cc b/src/fiber.cc index 90a37e15ce2d3fcfd02472359d291e039b242c00..1d7bfae97028cd6eefbc2827bcf848dc3a4d9d91 100644 --- a/src/fiber.cc +++ b/src/fiber.cc @@ -468,7 +468,8 @@ fiber_new(const char *name, void (*f) (va_list)) } else { fiber = (struct fiber *) mempool_alloc0(&cord->fiber_pool); - tarantool_coro_create(&fiber->coro, fiber_loop, NULL); + tarantool_coro_create(&fiber->coro, &cord->slabc, + fiber_loop, NULL); region_create(&fiber->gc, &cord->slabc); @@ -497,20 +498,20 @@ fiber_new(const char *name, void (*f) (va_list)) * cord_destroy(). */ void -fiber_destroy(struct fiber *f) +fiber_destroy(struct cord *cord, struct fiber *f) { if (f == fiber()) { /** End of the application. */ - assert(cord() == &main_cord); + assert(cord == &main_cord); return; } - assert(f != &cord()->sched); + assert(f != &cord->sched); trigger_destroy(&f->on_yield); trigger_destroy(&f->on_stop); rlist_del(&f->state); region_destroy(&f->gc); - tarantool_coro_destroy(&f->coro); + tarantool_coro_destroy(&f->coro, &cord->slabc); Exception::cleanup(&f->exception); } @@ -519,9 +520,9 @@ fiber_destroy_all(struct cord *cord) { struct fiber *f; rlist_foreach_entry(f, &cord->alive, link) - fiber_destroy(f); + fiber_destroy(cord, f); rlist_foreach_entry(f, &cord->dead, link) - fiber_destroy(f); + fiber_destroy(cord, f); } void @@ -539,6 +540,7 @@ cord_create(struct cord *cord, const char *name) cord->fiber_registry = mh_i32ptr_new(); /* sched fiber is not present in alive/ready/dead list. */ + cord->call_stack_depth = 0; cord->sched.fid = 1; fiber_reset(&cord->sched); Exception::init(&cord->sched.exception); @@ -556,6 +558,7 @@ cord_create(struct cord *cord, const char *name) void cord_destroy(struct cord *cord) { + slab_cache_set_thread(&cord->slabc); ev_async_stop(cord->loop, &cord->wakeup_event); /* Only clean up if initialized. */ if (cord->fiber_registry) { @@ -583,6 +586,7 @@ void *cord_thread_func(void *p) { struct cord_thread_arg *ct_arg = (struct cord_thread_arg *) p; cord() = ct_arg->cord; + slab_cache_set_thread(&cord()->slabc); struct cord *cord = cord(); cord_create(cord, ct_arg->name); /** Can't possibly be the main thread */ diff --git a/src/fiber.h b/src/fiber.h index 3acbece774d38237899b2290de517cc7ded7ef0a..b8f880fcfcef0d7d2f25f3c0409130db8ad4e9b1 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -43,6 +43,9 @@ #if defined(__cplusplus) #include "exception.h" +#else +#define class struct +class Exception; #endif /* defined(__cplusplus) */ #include "salad/rlist.h" @@ -157,7 +160,6 @@ struct fiber { }; enum { FIBER_CALL_STACK = 16 }; -class Exception; /** * @brief An independent execution unit that can be managed by a separate OS @@ -167,8 +169,6 @@ class Exception; struct cord { /** The fiber that is currently being executed. */ struct fiber *fiber; - /** The "main" fiber of this cord, the scheduler. */ - struct fiber sched; struct ev_loop *loop; /** Depth of the fiber call stack. */ int call_stack_depth; @@ -197,6 +197,8 @@ struct cord { struct mempool fiber_pool; /** A runtime slab cache for general use in this cord. */ struct slab_cache slabc; + /** The "main" fiber of this cord, the scheduler. */ + struct fiber sched; char name[FIBER_NAME_MAX]; }; diff --git a/src/iobuf.cc b/src/iobuf.cc index 54fcb03827bf09d906347dceed59b56231d55b70..eba29561d33a988129474007b08b1189346bbee7 100644 --- a/src/iobuf.cc +++ b/src/iobuf.cc @@ -282,6 +282,8 @@ SLIST_HEAD(iobuf_cache, iobuf) iobuf_cache; struct iobuf * iobuf_new(const char *name) { + /* Does not work in multiple cords yet. */ + assert(cord_is_main()); struct iobuf *iobuf; if (SLIST_EMPTY(&iobuf_cache)) { iobuf = (struct iobuf *) mempool_alloc(&iobuf_pool); diff --git a/src/lib/small/slab_cache.c b/src/lib/small/slab_cache.c index c64eb99e5dfd3daf9e0dc61d39c65c7a3ba9b514..e7a41dc38f4170c8e2a568f83e8d3cdec26dfb29 100644 --- a/src/lib/small/slab_cache.c +++ b/src/lib/small/slab_cache.c @@ -66,6 +66,7 @@ static inline void slab_assert(struct slab_cache *cache, struct slab *slab) { (void) slab; + assert(pthread_equal(cache->thread_id, pthread_self())); assert(slab->magic == slab_magic); assert(slab->order <= cache->order_max + 1); if (slab->order <= cache->order_max) { @@ -179,6 +180,7 @@ slab_cache_create(struct slab_cache *cache, struct slab_arena *arena) slab_list_create(&cache->allocated); for (uint8_t i = 0; i <= cache->order_max; i++) slab_list_create(&cache->orders[i]); + slab_cache_set_thread(cache); } void diff --git a/src/lib/small/slab_cache.h b/src/lib/small/slab_cache.h index abc684f4a23d002c50cfd2bf5944d1b9d4b3b802..aa103f3cdd72795e47acaffd7985e24101889af1 100644 --- a/src/lib/small/slab_cache.h +++ b/src/lib/small/slab_cache.h @@ -34,6 +34,7 @@ #include <assert.h> #include "salad/rlist.h" #include "slab_arena.h" +#include <pthread.h> #if defined(__cplusplus) extern "C" { @@ -159,6 +160,9 @@ struct slab_cache { * next_in_list link may be reused for some other purpose. */ struct slab_list orders[ORDER_MAX+1]; +#ifndef _NDEBUG + pthread_t thread_id; +#endif }; void @@ -224,6 +228,18 @@ slab_order_size(struct slab_cache *cache, uint8_t order) return size << (order + cache->order0_size_lb); } +/** + * Debug only: track that all allocations + * are made from a single thread. + */ +static inline void +slab_cache_set_thread(struct slab_cache *cache) +{ + (void) cache; +#ifndef _NDEBUG + cache->thread_id = pthread_self(); +#endif +} #if defined(__cplusplus) } /* extern "C" */