diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 39cf403471dbce4a3fcc7a35972814387db73dc8..6ac977ac16b9840df957b5a1ccdf4d698f75f504 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -345,13 +345,15 @@ recover_snap(struct recovery_state *r) /** * Don't rescan the directory, it's done when * recovery is initialized. - * Don't check if the directory index is empty, there must - * be at least one existing snapshot, otherwise we would - * have created it from a bootstrap copy. */ struct vclock *res = vclockset_last(&r->snap_dir.index); + /* + * The only case when the directory index is empty is + * when someone has deleted a snapshot and tries to join + * as a replica. Our best effort is to not crash in such case. + */ if (res == NULL) - tnt_raise(ClientError, ER_MISSING_SNAPSHOT); + tnt_raise(ClientError, ER_MISSING_SNAPSHOT); int64_t signature = vclock_signature(res); struct xlog *snap = xlog_open(&r->snap_dir, signature, NONE); diff --git a/src/box/recovery.h b/src/box/recovery.h index 9ccc852e9f712c0b2546363d7404c29d6ff1047f..d5c99d274eb7f747b73d70a407f3009f1b4aa918 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -61,14 +61,6 @@ enum wal_mode { WAL_NONE = 0, WAL_WRITE, WAL_FSYNC, WAL_MODE_MAX }; /** String constants for the supported modes. */ extern const char *wal_mode_STRS[]; -/** State of a replication relay. */ -struct relay { - /** Replica connection */ - int sock; - /* Request sync */ - uint64_t sync; -}; - enum { REMOTE_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ /** State of a replication connection to the master */ @@ -101,12 +93,7 @@ struct recovery_state { * locally or send to the replica. */ struct fiber *watcher; - union { - /** slave->master state */ - struct remote remote; - /** master->slave state */ - struct relay relay; - }; + struct remote remote; /** * apply_row is a module callback invoked during initial * recovery and when reading rows from the master. diff --git a/src/box/replica.cc b/src/box/replica.cc index 11d3b825e6ed3355fde724fc39212c0fd56b6e24..15d02e1d6fcaa952887b8bfae849dfce136dc5bc 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -29,9 +29,6 @@ #include "replica.h" #include "recovery.h" #include "tarantool.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> #include "xlog.h" #include "fiber.h" diff --git a/src/box/replication.cc b/src/box/replication.cc index 49061b9c9eee119541c5bfe656ac7a22b2513600..d292c432cbf8b75a8053f39e45e4f4ad98222f1b 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -42,61 +42,74 @@ #include "coeio.h" #include "coio.h" #include "cfg.h" +#include "trigger.h" static void -replication_send_row(struct recovery_state *r, void * /* param */, - struct xrow_header *packet); +replication_send_row(struct recovery_state *r, void *param, + struct xrow_header *packet); + +/** State of a replication relay. */ +class Relay { +public: + /** Replica connection */ + struct ev_io io; + /* Request sync */ + uint64_t sync; + struct recovery_state *r; -/** Replication acceptor fiber handler. */ -static void * -replication_join_thread(void *arg) + Relay(int fd_arg, uint64_t sync_arg) + { + r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), + replication_send_row, this); + coio_init(&io); + io.fd = fd_arg; + sync = sync_arg; + } + ~Relay() + { + recovery_delete(r); + } +}; + +static inline void +relay_set_cord_name(int fd) { - struct recovery_state *r = (struct recovery_state *) arg; + char name[FIBER_NAME_MAX]; + struct sockaddr_storage peer; + socklen_t addrlen = sizeof(peer); + getpeername(fd, ((struct sockaddr*)&peer), &addrlen); + snprintf(name, sizeof(name), "relay/%s", + sio_strfaddr((struct sockaddr *)&peer, addrlen)); + cord_set_name(name); +} - /* Turn off the non-blocking mode, if any. */ - int nonblock = sio_getfl(r->relay.sock) & O_NONBLOCK; - sio_setfl(r->relay.sock, O_NONBLOCK, 0); - auto socket_guard = make_scoped_guard([=]{ - /* Restore non-blocking mode */ - sio_setfl(r->relay.sock, O_NONBLOCK, nonblock); - }); +void +replication_join_f(va_list ap) +{ + Relay *relay = va_arg(ap, Relay *); + struct recovery_state *r = relay->r; + relay_set_cord_name(relay->io.fd); /* Send snapshot */ recover_snap(r); /* Send response to JOIN command = end of stream */ struct xrow_header row; xrow_encode_vclock(&row, &r->vclock); - row.sync = r->relay.sync; + row.sync = relay->sync; struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec(&row, iov); - sio_writev_all(r->relay.sock, iov, iovcnt); - + coio_writev(&relay->io, iov, iovcnt, 0); say_info("snapshot sent"); - return NULL; } void replication_join(int fd, struct xrow_header *packet) { - struct recovery_state *r; - r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), - replication_send_row, NULL); - auto recovery_guard = make_scoped_guard([&]{ - recovery_delete(r); - }); - r->relay.sock = fd; - r->relay.sync = packet->sync; - - char name[FIBER_NAME_MAX]; - struct sockaddr_storage peer; - socklen_t addrlen = sizeof(peer); - getpeername(r->relay.sock, ((struct sockaddr*)&peer), &addrlen); - snprintf(name, sizeof(name), "relay/%s", - sio_strfaddr((struct sockaddr *)&peer, addrlen)); + Relay relay(fd, packet->sync); struct cord cord; - cord_start(&cord, name, replication_join_thread, r); + cord_costart(&cord, "join", replication_join_f, &relay); cord_cojoin(&cord); } @@ -108,77 +121,51 @@ replication_join(int fd, struct xrow_header *packet) static void replication_subscribe_f(va_list ap) { - struct recovery_state *r = va_arg(ap, struct recovery_state *); + Relay *relay = va_arg(ap, Relay *); + struct recovery_state *r = relay->r; + relay_set_cord_name(relay->io.fd); recovery_follow_local(r, 0.1); /* * Init a read event: when replica closes its end * of the socket, we can read EOF and shutdown the * relay. */ - struct ev_io sock_read_ev; - sock_read_ev.data = fiber(); - ev_io_init(&sock_read_ev, (ev_io_cb) fiber_schedule, - r->relay.sock, EV_READ); + struct ev_io read_ev; + read_ev.data = fiber(); + ev_io_init(&read_ev, (ev_io_cb) fiber_schedule, + relay->io.fd, EV_READ); while (true) { - ev_io_start(loop(), &sock_read_ev); + ev_io_start(loop(), &read_ev); fiber_yield(); - ev_io_stop(loop(), &sock_read_ev); + ev_io_stop(loop(), &read_ev); uint8_t data; - int rc = recv(r->relay.sock, &data, sizeof(data), 0); + int rc = recv(read_ev.fd, &data, sizeof(data), 0); if (rc == 0 || (rc < 0 && errno == ECONNRESET)) { say_info("the replica has closed its socket, exiting"); goto end; } - if (rc < 0) + if (rc < 0 && errno != EINTR && errno != EAGAIN && + errno != EWOULDBLOCK) say_syserror("recv"); } end: recovery_stop_local(r); - ev_break(loop(), EVBREAK_ALL); -} - -static void * -replication_subscribe_thread(void *arg) -{ - struct recovery_state *r = (struct recovery_state *) arg; - - /* Turn off the non-blocking mode, if any. */ - int nonblock = sio_getfl(r->relay.sock) & O_NONBLOCK; - sio_setfl(r->relay.sock, O_NONBLOCK, 0); - auto socket_guard = make_scoped_guard([=]{ - /* Restore non-blocking mode */ - sio_setfl(r->relay.sock, O_NONBLOCK, nonblock); - }); - - struct fiber *f = fiber_new("subscribe", replication_subscribe_f); - fiber_start(f, r); - ev_run(loop(), 0); - say_crit("exiting the relay loop"); - return 0; } /** Replication acceptor fiber handler. */ void replication_subscribe(int fd, struct xrow_header *packet) { - struct recovery_state *r; - r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), - replication_send_row, NULL); - - auto recovery_guard = make_scoped_guard([&]{ - recovery_delete(r); - }); - - r->relay.sock = fd; - r->relay.sync = packet->sync; + Relay relay(fd, packet->sync); struct tt_uuid uu = uuid_nil, server_uuid = uuid_nil; + struct recovery_state *r = relay.r; xrow_decode_subscribe(packet, &uu, &server_uuid, &r->vclock); /** @@ -200,23 +187,17 @@ replication_subscribe(int fd, struct xrow_header *packet) tt_uuid_str(&server_uuid)); } - char name[FIBER_NAME_MAX]; - struct sockaddr_storage peer; - socklen_t addrlen = sizeof(peer); - getpeername(r->relay.sock, ((struct sockaddr*)&peer), &addrlen); - snprintf(name, sizeof(name), "relay/%s", - sio_strfaddr((struct sockaddr *)&peer, addrlen)); - struct cord cord; - cord_start(&cord, name, replication_subscribe_thread, r); + cord_costart(&cord, "subscribe", replication_subscribe_f, &relay); cord_cojoin(&cord); } /** Send a single row to the client. */ static void -replication_send_row(struct recovery_state *r, void * /* param */, - struct xrow_header *packet) +replication_send_row(struct recovery_state *r, void *param, + struct xrow_header *packet) { + Relay *relay = (Relay *) param; assert(iproto_type_is_dml(packet->type)); /* @@ -228,10 +209,10 @@ replication_send_row(struct recovery_state *r, void * /* param */, * replica's own rows back). */ if (packet->server_id == 0 || packet->server_id != r->server_id) { - packet->sync = r->relay.sync; + packet->sync = relay->sync; struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec(packet, iov); - sio_writev_all(r->relay.sock, iov, iovcnt); + coio_writev(&relay->io, iov, iovcnt, 0); } /* * Update local vclock. During normal operation wal_write() diff --git a/src/fiber.cc b/src/fiber.cc index 1d7bfae97028cd6eefbc2827bcf848dc3a4d9d91..16312be9b3c0d8011fc633b275eaa1d27d5808ae 100644 --- a/src/fiber.cc +++ b/src/fiber.cc @@ -407,10 +407,15 @@ fiber_loop(void *data __attribute__((unused))) fiber_name(fiber)); panic("fiber `%s': exiting", fiber_name(fiber)); } - /** By convention, these triggers must not throw. */ + fiber_schedule_list(&fiber->wake); + /** + * By convention, these triggers must not throw. + * Call triggers after scheduling, since an + * on_stop trigger of the first fiber may + * break the event loop. + */ if (! rlist_empty(&fiber->on_stop)) trigger_run(&fiber->on_stop, fiber); - fiber_schedule_list(&fiber->wake); if (fiber->flags & FIBER_IS_JOINABLE) { /* * The fiber needs to be joined, @@ -615,6 +620,7 @@ void *cord_thread_func(void *p) return res; } + int cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg) { @@ -655,6 +661,64 @@ cord_join(struct cord *cord) return res; } +void +break_ev_loop_f(struct trigger * /* trigger */, void * /* event */) +{ + ev_break(loop(), EVBREAK_ALL); +} + +struct costart_ctx +{ + fiber_func run; + void *arg; +}; + +/** Replication acceptor fiber handler. */ +static void * +cord_costart_thread_func(void *arg) +{ + struct costart_ctx ctx = *(struct costart_ctx *) arg; + free(arg); + + struct fiber *f = fiber_new("main", ctx.run); + + struct trigger break_ev_loop = { + rlist_nil, break_ev_loop_f, NULL, NULL + }; + /* + * Got to be in a trigger, to break the loop even + * in case of an exception. + */ + trigger_add(&f->on_stop, &break_ev_loop); + fiber_start(f, ctx.arg); + if (f->fid > 0) { + /* The fiber hasn't died right away at start. */ + ev_run(loop(), 0); + } + if (f->exception) { + Exception::move(&f->exception, &fiber()->exception); + fiber()->exception->raise(); + } + + return NULL; +} + +int +cord_costart(struct cord *cord, const char *name, fiber_func f, void *arg) +{ + /** Must be allocated to avoid races. */ + struct costart_ctx *ctx = (struct costart_ctx *) malloc(sizeof(*ctx)); + if (ctx == NULL) + return -1; + ctx->run = f; + ctx->arg = arg; + if (cord_start(cord, name, cord_costart_thread_func, ctx) == -1) { + free(ctx); + return -1; + } + return 0; +} + bool cord_is_main() { diff --git a/src/fiber.h b/src/fiber.h index b8f880fcfcef0d7d2f25f3c0409130db8ad4e9b1..7028b4f6ad49561b14d5b3313338230acad3b1ba 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -113,6 +113,8 @@ enum fiber_key { FIBER_KEY_MAX = 4 }; +typedef void(*fiber_func)(va_list); + struct fiber { struct tarantool_coro coro; /* A garbage-collected memory pool. */ @@ -220,6 +222,15 @@ int cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg); +/** + * Like cord_start(), but starts the event loop and + * a fiber in the event loop. The event loop ends when the + * fiber in main fiber dies/returns. The exception of the main + * fiber is propagated to cord_cojoin(). + */ +int +cord_costart(struct cord *cord, const char *name, fiber_func f, void *arg); + /** * Wait for \a cord to terminate. If \a cord has already * terminated, then returns immediately. @@ -252,7 +263,6 @@ cord_is_main(); void fiber_init(void); void fiber_free(void); -typedef void(*fiber_func)(va_list); struct fiber * fiber_new(const char *name, fiber_func f);