diff --git a/src/box/box.cc b/src/box/box.cc index 6722e32f4df4f749c20f24e7c7c22f9c5561f1e9..9a89724200aefd021fa6f8ae252355e3164bf025 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -54,12 +54,16 @@ #include "cfg.h" #include "iobuf.h" #include "coio.h" +#include "cluster.h" /* replica */ +#include "replica.h" static void process_ro(struct request *request, struct port *port); box_process_func box_process = process_ro; struct recovery_state *recovery; +static struct replica replica_buf; /* used to store the single instance */ + bool snapshot_in_progress = false; static bool box_init_done = false; @@ -165,31 +169,31 @@ box_check_config() extern "C" void box_set_replication_source(void) { + if (recovery->writer == NULL) { + /* + * Do nothing, we're in local hot standby mode, the server + * will automatically begin following the replica when local + * hot standby mode is finished, see box_init(). + */ + return; + } + const char *source = cfg_gets("replication_source"); - bool old_is_replica = recovery->replica.reader; - bool new_is_replica = source != NULL; - - if (old_is_replica != new_is_replica || - (old_is_replica && - (strcmp(source, recovery->replica.source) != 0))) { - - if (recovery->writer) { - if (old_is_replica) - recovery_stop_replica(recovery); - recovery_set_replica(recovery, source); - if (recovery_has_replica(recovery)) - recovery_follow_replica(recovery); - } else { - /* - * Do nothing, we're in local hot - * standby mode, the server - * will automatically begin following - * the replica when local hot standby - * mode is finished, see - * box_leave_local_hot_standby_mode() - */ - } + + /* This hook is only invoked if source has changed */ + if (replica != NULL) { + replica_stop(replica); /* cancels a background fiber */ + replica_destroy(replica); + replica = NULL; } + + if (source == NULL) + return; + + /* Start a new replication client using provided URI */ + replica_create(&replica_buf, source); + replica = &replica_buf; + replica_start(replica, recovery); /* starts a background fiber */ } extern "C" void @@ -635,22 +639,32 @@ box_init(void) recovery = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), recover_row, NULL); - recovery_set_replica(recovery, - cfg_gets("replication_source")); recovery_setup_panic(recovery, cfg_geti("panic_on_snap_error"), cfg_geti("panic_on_wal_error")); - + const char *source = cfg_gets("replication_source"); if (recovery_has_data(recovery)) { /* Tell Sophia engine LSN it must recover to. */ int64_t checkpoint_id = recovery_last_checkpoint(recovery); engine_recover_to_checkpoint(checkpoint_id); - } else if (recovery_has_replica(recovery)) { + } else if (source != NULL) { + /* Generate Server-UUID */ + tt_uuid_create(&recovery->server_uuid); + /* Initialize a new replica */ engine_begin_join(); - replica_bootstrap(recovery); + + /* Add a surrogate server id for snapshot rows */ + vclock_add_server(&recovery->vclock, 0); + + /* Bootstrap from replica */ + replica_create(&replica_buf, source); + replica = &replica_buf; + replica_start(replica, recovery); + replica_join(replica); /* throws on failure */ + int64_t checkpoint_id = vclock_sum(&recovery->vclock); engine_checkpoint(checkpoint_id); } else { @@ -680,8 +694,16 @@ box_init(void) stat_cleanup(stat_base, IPROTO_TYPE_STAT_MAX); - if (recovery_has_replica(recovery)) - recovery_follow_replica(recovery); + if (source != NULL) { + if (replica == NULL) { + replica_create(&replica_buf, source); + replica = &replica_buf; + } /* else re-use instance from bootstrap */ + /* Follow replica */ + assert(recovery->writer); + replica_start(replica, recovery); + } + /* Enter read-write mode. */ if (recovery->server_id > 0) box_set_ro(false); diff --git a/src/box/cluster.cc b/src/box/cluster.cc index 916bd37c4456b3ce384e8389d022d40c710645ee..aeb21d68351d5803204a43e71fdee409ed1a460e 100644 --- a/src/box/cluster.cc +++ b/src/box/cluster.cc @@ -31,6 +31,7 @@ #include "box.h" #include "cluster.h" #include "recovery.h" +#include "replica.h" /** * Globally unique identifier of this cluster. @@ -38,6 +39,8 @@ */ tt_uuid cluster_id; +struct replica *replica; + extern "C" struct vclock * cluster_clock() { diff --git a/src/box/cluster.h b/src/box/cluster.h index da584bd62866f7654aa635ed5d153214d793a81a..8ac9e9ad130b9923cf4e271f7828495a9f975d82 100644 --- a/src/box/cluster.h +++ b/src/box/cluster.h @@ -87,6 +87,8 @@ extern tt_uuid cluster_id; extern "C" struct vclock * cluster_clock(); +extern struct replica *replica; + /* }}} */ /** {{{ Cluster server id API **/ diff --git a/src/box/lua/info.cc b/src/box/lua/info.cc index 3628417960284327a270a5ce736c147029b99e00..28372d03815c264540c72b668e73c1d533f45673 100644 --- a/src/box/lua/info.cc +++ b/src/box/lua/info.cc @@ -31,6 +31,8 @@ #include "info.h" +#include <ctype.h> /* tolower() */ + extern "C" { #include <lua.h> #include <lauxlib.h> @@ -48,13 +50,24 @@ extern "C" { static int lbox_info_replication(struct lua_State *L) { - struct recovery_state *r = recovery; - struct replica *replica = &r->replica; - lua_newtable(L); + if (replica == NULL) { + lua_pushstring(L, "status"); + lua_pushstring(L, "off"); + lua_settable(L, -3); + return 1; + } + + /* Get replica state in lower case */ + static char status[16]; + char *d = status; + const char *s = replica_state_strs[replica->state] + strlen("REPLICA_"); + assert(strlen(s) < sizeof(status)); + while ((*(d++) = tolower(*(s++)))); + lua_pushstring(L, "status"); - lua_pushstring(L, replica->status); + lua_pushstring(L, status); lua_settable(L, -3); if (replica->reader) { diff --git a/src/box/recovery.cc b/src/box/recovery.cc index ae76f2a4e618fc1a767d82d40ca7e7f48cbd5f52..599e8dbf756be92b45f0001b7bc5fbbd73b3e8b3 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -91,8 +91,8 @@ * IR -> HS # recovery_follow_local() * IRR -> RR # recovery_follow_local() * HS -> M # recovery_finalize() - * M -> R # recovery_follow_replica() - * R -> M # recovery_stop_replica() + * M -> R # remote_start() + * R -> M # remote_stop() */ /* {{{ LSN API */ @@ -172,7 +172,6 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, xdir_check(&r->wal_dir); r->watcher = NULL; - recovery_init_replica(r); guard.is_active = false; return r; diff --git a/src/box/recovery.h b/src/box/recovery.h index 134ca1d6546ffbd09fecf51b8cd153ed06e8c09d..ef1a574bb287e8d93e05c52a59e96c4d7bee788c 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -36,7 +36,6 @@ #include "vclock.h" #include "tt_uuid.h" #include "wal.h" -#include "replica.h" #if defined(__cplusplus) extern "C" { @@ -67,7 +66,6 @@ struct recovery_state { * locally or send to the replica. */ struct fiber *watcher; - struct replica replica; /** * apply_row is a module callback invoked during initial * recovery and when reading rows from the master. diff --git a/src/box/relay.cc b/src/box/relay.cc index a5beec36ba59d80ad77e04c295715b5aa0e61a73..ee2fbc008ed98d422086d0abadbf5c2fb9be644c 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -57,8 +57,7 @@ Relay::Relay(int fd_arg, uint64_t sync_arg) recovery_setup_panic(r, cfg_geti("panic_on_snap_error"), cfg_geti("panic_on_wal_error")); - coio_init(&io); - io.fd = fd_arg; + coio_init(&io, fd_arg); sync = sync_arg; wal_dir_rescan_delay = cfg_getd("wal_dir_rescan_delay"); } diff --git a/src/box/replica.cc b/src/box/replica.cc index 40626621bb98ab5c446624de2dd063cd88d0ca9a..1b77acba2ef716530d2862864fabff716365409f 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -29,12 +29,11 @@ * SUCH DAMAGE. */ #include "replica.h" -#include "recovery.h" -#include "main.h" #include "xlog.h" #include "fiber.h" #include "scoped_guard.h" +#include "coio.h" #include "coio_buf.h" #include "recovery.h" #include "xrow.h" @@ -43,6 +42,15 @@ #include "iproto_constants.h" static const int RECONNECT_DELAY = 1.0; +STRS(replica_state, replica_STATE); + +static inline void +replica_set_state(struct replica *replica, enum replica_state state) +{ + replica->state = state; + say_debug("=> %s", replica_state_strs[state] + + strlen("REPLICA_")); +} static void replica_read_row(struct ev_io *coio, struct iobuf *iobuf, @@ -81,13 +89,16 @@ replica_write_row(struct ev_io *coio, const struct xrow_header *row) coio_writev(coio, iov, iovcnt, 0); } +/** + * Connect to a remote host and authenticate the client. + */ static void -replica_connect(struct recovery_state *r, struct ev_io *coio, - struct iobuf *iobuf) +replica_connect(struct replica *replica, struct ev_io *coio, + struct iobuf *iobuf) { + assert(replica->io.fd < 0); char greeting[IPROTO_GREETING_SIZE]; - struct replica *replica = &r->replica; struct uri *uri = &replica->uri; /* * coio_connect() stores resolved address to \a &replica->addr @@ -98,20 +109,27 @@ replica_connect(struct recovery_state *r, struct ev_io *coio, * replica->addr_len will be different even for same uri. */ replica->addr_len = sizeof(replica->addrstorage); - /* Prepare null-terminated strings for coio_connect() */ + replica_set_state(replica, REPLICA_CONNECT); coio_connect(coio, uri, &replica->addr, &replica->addr_len); assert(coio->fd >= 0); coio_readn(coio, greeting, sizeof(greeting)); - say_crit("connected to %s", sio_strfaddr(&replica->addr, - replica->addr_len)); + if (!replica->warning_said) { + say_info("connected to %s", sio_strfaddr(&replica->addr, + replica->addr_len)); + } + + /* Don't display previous error messages in box.info.replication */ + diag_clear(&fiber()->diag); /* Perform authentication if user provided at least login */ - if (!replica->uri.login) + if (!uri->login) { + replica_set_state(replica, REPLICA_CONNECTED); return; + } /* Authenticate */ - say_debug("authenticating..."); + replica_set_state(replica, REPLICA_AUTH); struct xrow_header row; xrow_encode_auth(&row, greeting, uri->login, uri->login_len, uri->password, @@ -125,56 +143,28 @@ replica_connect(struct recovery_state *r, struct ev_io *coio, say_info("authenticated"); } -void -replica_bootstrap(struct recovery_state *r) +/** + * Execute and process JOIN request (bootstrap the server). + */ +static void +replica_process_join(struct replica *replica, struct recovery_state *r, + struct ev_io *coio, struct iobuf *iobuf) { - say_info("bootstrapping a replica"); - struct replica *replica = &r->replica; - assert(recovery_has_replica(r)); - - /* Generate Server-UUID */ - tt_uuid_create(&r->server_uuid); - - struct ev_io coio; - coio_init(&coio); - struct iobuf *iobuf = iobuf_new(); - auto coio_guard = make_scoped_guard([&] { - iobuf_delete(iobuf); - evio_close(loop(), &coio); - }); - - for (;;) { - try { - replica_connect(r, &coio, iobuf); - replica->warning_said = false; - break; - } catch (FiberCancelException *e) { - throw; - } catch (Exception *e) { - if (! replica->warning_said) { - say_error("can't connect to master"); - e->log(); - say_info("will retry every %i second", - RECONNECT_DELAY); - replica->warning_said = true; - } - iobuf_reset(iobuf); - evio_close(loop(), &coio); - } - fiber_sleep(RECONNECT_DELAY); + if (!replica->warning_said) { + say_info("bootstrapping a replica from %s", + sio_strfaddr(&replica->addr, replica->addr_len)); } /* Send JOIN request */ struct xrow_header row; xrow_encode_join(&row, &r->server_uuid); - replica_write_row(&coio, &row); + replica_write_row(coio, &row); + replica_set_state(replica, REPLICA_BOOTSTRAP); - /* Add a surrogate server id for snapshot rows */ - vclock_add_server(&r->vclock, 0); + assert(vclock_has(&r->vclock, 0)); /* check for surrogate server_id */ while (true) { - replica_read_row(&coio, iobuf, &row); - + replica_read_row(coio, iobuf, &row); if (row.type == IPROTO_OK) { /* End of stream */ say_info("done"); @@ -196,85 +186,140 @@ replica_bootstrap(struct recovery_state *r) /* Replace server vclock using data from snapshot */ vclock_copy(&r->vclock, &vclock); - /* master socket closed by guard */ + /* Re-enable warnings after successful execution of JOIN */ + replica_set_state(replica, REPLICA_CONNECTED); + /* keep connection */ } +/** + * Execute and process SUBSCRIBE request (follow updates from a master). + */ static void -replica_set_status(struct replica *replica, const char *status) +replica_process_subscribe(struct replica *replica, struct recovery_state *r, + struct ev_io *coio, struct iobuf *iobuf) +{ + if (!replica->warning_said) { + say_info("subscribing to updates from %s", + sio_strfaddr(&replica->addr, replica->addr_len)); + } + + /* Send SUBSCRIBE request */ + struct xrow_header row; + xrow_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock); + replica_write_row(coio, &row); + replica_set_state(replica, REPLICA_FOLLOW); + /* Re-enable warnings after successful execution of SUBSCRIBE */ + replica->warning_said = false; + + /** + * If there is an error in subscribe, it's + * sent directly in response to subscribe. + * If subscribe is successful, there is no + * "OK" response, but a stream of rows. + * from the binary log. + */ + while (true) { + replica_read_row(coio, iobuf, &row); + replica->lag = ev_now(loop()) - row.tm; + replica->last_row_time = ev_now(loop()); + + if (iproto_type_is_error(row.type)) + xrow_decode_error(&row); /* error */ + recovery_apply_row(r, &row); + + iobuf_reset(iobuf); + fiber_gc(); + } +} + +/** + * Write a nice error message to log file on SocketError or ClientError + * in pull_from_replica(). + */ +static inline void +replica_log_exception(struct replica *replica, Exception *e) { - replica->status = status; + if (replica->warning_said) + return; + switch (replica->state) { + case REPLICA_CONNECT: + say_info("can't connect to master"); + break; + case REPLICA_CONNECTED: + say_info("can't join/subscribe"); + break; + case REPLICA_AUTH: + say_info("failed to authenticate"); + break; + case REPLICA_FOLLOW: + case REPLICA_BOOTSTRAP: + say_info("can't read row"); + break; + default: + break; + } + e->log(); + replica->warning_said = true; } static void pull_from_replica(va_list ap) { + struct replica *replica = va_arg(ap, struct replica *); struct recovery_state *r = va_arg(ap, struct recovery_state *); - struct replica *replica = &r->replica; - struct ev_io coio; + struct ev_io *coio = &replica->io; struct iobuf *iobuf = iobuf_new(); ev_loop *loop = loop(); - coio_init(&coio); - - auto coio_guard = make_scoped_guard([&] { - iobuf_delete(iobuf); - evio_close(loop(), &coio); - }); + coio_init(coio, coio->fd); /* re-use connection if any */ + /* Re-connect loop */ while (true) { - const char *err = NULL; try { - struct xrow_header row; - if (! evio_has_fd(&coio)) { - replica_set_status(replica, "connecting"); - err = "can't connect to master"; - replica_connect(r, &coio, iobuf); - /* Send SUBSCRIBE request */ - err = "can't subscribe to master"; - xrow_encode_subscribe(&row, &cluster_id, - &r->server_uuid, &r->vclock); - replica_write_row(&coio, &row); - replica->warning_said = false; - replica_set_status(replica, "connected"); + if (coio->fd < 0) + replica_connect(replica, coio, iobuf); + + /* + * Execute JOIN if recovery is not finalized yet + * and SUBSCRIBE otherwise. + */ + if (r->writer == NULL) { + replica_process_join(replica, r, coio, iobuf); + ev_io_stop(loop(), coio); + /* keep connection */ + return; } - err = "can't read row"; - /** - * If there is an error in subscribe, it's - * sent directly in response to subscribe. - * If subscribe is successful, there is no - * "OK" response, but a stream of rows. - * from the binary log. + replica_process_subscribe(replica, r, coio, iobuf); + /* + * process_subscribe() has an infinity loop and + * can be stopped only using fiber_cancel() */ - replica_read_row(&coio, iobuf, &row); - err = NULL; - replica->lag = ev_now(loop) - row.tm; - replica->last_row_time = ev_now(loop); - - if (iproto_type_is_error(row.type)) - xrow_decode_error(&row); /* error */ - recovery_apply_row(r, &row); - - iobuf_reset(iobuf); - fiber_gc(); + assert(0); /* unreachable */ + break; } catch (ClientError *e) { - replica_set_status(replica, "stopped"); + replica_log_exception(replica, e); + evio_close(loop, coio); + iobuf_delete(iobuf); + replica_set_state(replica, REPLICA_STOPPED); throw; } catch (FiberCancelException *e) { - replica_set_status(replica, "off"); + evio_close(loop, coio); + iobuf_delete(iobuf); + replica_set_state(replica, REPLICA_OFF); throw; - } catch (Exception *e) { - replica_set_status(replica, "disconnected"); - if (!replica->warning_said) { - if (err != NULL) - say_info("%s", err); - e->log(); - say_info("will retry every %i second", - RECONNECT_DELAY); - replica->warning_said = true; - } - evio_close(loop, &coio); + } catch (SocketError *e) { + replica_log_exception(replica, e); + evio_close(loop, coio); + replica_set_state(replica, REPLICA_DISCONNECTED); + /* fall through */ } + if (!replica->warning_said) + say_info("will retry every %i second", RECONNECT_DELAY); + replica->warning_said = true; + iobuf_reset(iobuf); + fiber_gc(); + /* Put fiber_sleep() out of catch block. * * This is done to avoid situation, when two or more @@ -288,63 +333,71 @@ pull_from_replica(va_list ap) * * See: https://github.com/tarantool/tarantool/issues/136 */ - if (! evio_has_fd(&coio)) + + try { fiber_sleep(RECONNECT_DELAY); + } catch (FiberCancelException *e) { + /* Cleanup resources on fiber_cancel() */ + iobuf_delete(iobuf); + throw; + } } } void -recovery_follow_replica(struct recovery_state *r) +replica_start(struct replica *replica, struct recovery_state *r) { char name[FIBER_NAME_MAX]; - struct replica *replica = &r->replica; - assert(replica->reader == NULL); - assert(recovery_has_replica(r)); const char *uri = uri_format(&replica->uri); - say_crit("starting replication from %s", uri); + say_info("starting replication from %s", uri); snprintf(name, sizeof(name), "replica/%s", uri); - struct fiber *f = fiber_new(name, pull_from_replica); /** * So that we can safely grab the status of the * fiber any time we want. */ fiber_set_joinable(f, true); - replica->reader = f; - fiber_start(f, r); + fiber_start(f, replica, r); } void -recovery_stop_replica(struct recovery_state *r) +replica_stop(struct replica *replica) { - say_info("shutting down the replica"); - struct replica *replica = &r->replica; struct fiber *f = replica->reader; - replica->reader = NULL; + if (f == NULL) + return; + const char *uri = uri_format(&replica->uri); + say_info("shutting down the replica %s", uri); fiber_cancel(f); /** * If the replica died from an exception, don't throw it * up. */ diag_clear(&f->diag); - fiber_join(f); - replica->status = "off"; + fiber_join(f); /* doesn't throw due do diag_clear() */ + replica_set_state(replica, REPLICA_OFF); + replica->reader = NULL; } void -recovery_set_replica(struct recovery_state *r, const char *uri) +replica_join(struct replica *replica) { - /* First, stop the reader, then set the source */ - struct replica *replica = &r->replica; - assert(replica->reader == NULL); - if (uri == NULL) { - replica->source[0] = '\0'; - return; - } + assert(replica->reader != NULL); + auto fiber_guard = make_scoped_guard([=] { replica->reader = NULL; }); + fiber_join(replica->reader); /* may throw */ +} + +void +replica_create(struct replica *replica, const char *uri) +{ + memset(replica, 0, sizeof(*replica)); + replica->io.fd = -1; + + /* uri_parse() sets pointers to replica->source buffer */ snprintf(replica->source, sizeof(replica->source), "%s", uri); int rc = uri_parse(&replica->uri, replica->source); /* URI checked by box_check_replication_source() */ @@ -352,14 +405,9 @@ recovery_set_replica(struct recovery_state *r, const char *uri) (void) rc; } -bool -recovery_has_replica(struct recovery_state *r) -{ - return r->replica.source[0]; -} - void -recovery_init_replica(struct recovery_state *r) +replica_destroy(struct replica *replica) { - r->replica.status = "off"; + assert(replica->reader == NULL); + evio_close(loop(), &replica->io); } diff --git a/src/box/replica.h b/src/box/replica.h index c07ada85f7111550ede194d3b9668666b7e7062f..470527ebf325032df3529b354c8846e5f896b0cd 100644 --- a/src/box/replica.h +++ b/src/box/replica.h @@ -42,10 +42,26 @@ struct recovery_state; enum { REPLICA_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ -/** State of a replication connection to the master */ +#define replica_STATE(_) \ + _(REPLICA_OFF, 0) \ + _(REPLICA_CONNECT, 1) \ + _(REPLICA_AUTH, 2) \ + _(REPLICA_CONNECTED, 3) \ + _(REPLICA_BOOTSTRAP, 4) \ + _(REPLICA_FOLLOW, 5) \ + _(REPLICA_STOPPED, 6) \ + _(REPLICA_DISCONNECTED, 7) \ + +/** States for the replica */ +ENUM(replica_state, replica_STATE); +extern const char *replica_state_strs[]; + +/** + * State of a replication connection to the master + */ struct replica { struct fiber *reader; - const char *status; + enum replica_state state; ev_tstamp lag, last_row_time; bool warning_said; char source[REPLICA_SOURCE_MAXLEN]; @@ -55,30 +71,54 @@ struct replica { struct sockaddr_storage addrstorage; }; socklen_t addr_len; + /* save master fd to re-use a connection between JOIN and SUBSCRIBE */ + struct ev_io io; }; -/** Connect to a master and request a snapshot. - * Raises an exception on error. +/** + * Start a client to a remote server using a background fiber. * - * @return A connected socket, ready too receive - * data. + * If recovery is finalized (i.e. r->writer != NULL) then the client + * connect to a master and follow remote updates using SUBSCRIBE command. + * + * If recovery is not finalized (i.e. r->writer == NULL) then the client + * connect to a master, download and process snapshot using JOIN command + * and then exits. The background fiber can be joined to get exit status + * using replica_join(). + * + * \pre A connection from io->fd is re-used. + * \sa fiber_start() */ void -replica_bootstrap(struct recovery_state *r); +replica_start(struct replica *replica, struct recovery_state *r); +/** + * Stop a client. + */ void -recovery_follow_replica(struct recovery_state *r); +replica_stop(struct replica *replica); +/** + * Wait replication client to finish and rethrow exception (if any). + * Use this function to wait bootstrap. + * + * \post This function keeps a open connection in io->fd. + * \sa replica_start() + * \sa fiber_join() + */ void -recovery_stop_replica(struct recovery_state *r); +replica_join(struct replica *replica); +/** + * Create replica and initialize remote uri (copied to struct replica). + */ void -recovery_set_replica(struct recovery_state *r, const char *source); - -bool -recovery_has_replica(struct recovery_state *r); +replica_create(struct replica *replica, const char *uri); +/** + * Destroy replica. + */ void -recovery_init_replica(struct recovery_state *r); +replica_destroy(struct replica *replica); #endif /* TARANTOOL_REPLICA_H_INCLUDED */ diff --git a/src/coio.cc b/src/coio.cc index 7ebce0994ffdaf8d85939903f715ca48d4897e94..64298c745d4a0eccf53a393b36ff45b515038cc6 100644 --- a/src/coio.cc +++ b/src/coio.cc @@ -51,12 +51,12 @@ typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int); /** Note: this function does not throw */ void -coio_init(struct ev_io *coio) +coio_init(struct ev_io *coio, int fd) { /* Prepare for ev events. */ coio->data = fiber(); ev_init(coio, (ev_io_cb) fiber_schedule_cb); - coio->fd = -1; + coio->fd = fd; } static inline bool @@ -592,8 +592,7 @@ coio_service_on_accept(struct evio_service *evio_service, evio_service->on_accept_param; struct ev_io coio; - coio_init(&coio); - coio.fd = fd; + coio_init(&coio, fd); /* Set connection name. */ char fiber_name[SERVICE_NAME_MAXLEN]; diff --git a/src/coio.h b/src/coio.h index 7332fb6f051635c77d92d3957a119388d68f458e..9509ffeca13a053184e1865d801362408bfd21d1 100644 --- a/src/coio.h +++ b/src/coio.h @@ -66,7 +66,7 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen, ev_tstamp timeout); void -coio_init(struct ev_io *coio); +coio_init(struct ev_io *coio, int fd); ssize_t coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz, diff --git a/test/replication/conflict.test.py b/test/replication/conflict.test.py index f482fa6e7d2e401d116eb84dd1d6ec8e3bddab5c..8d6e9347f1eed8ac6d1f43e4cd90c0e66332db32 100644 --- a/test/replication/conflict.test.py +++ b/test/replication/conflict.test.py @@ -70,7 +70,7 @@ parallel_run( "box.space.test:update(1, {{'#', 2, 1}})", [ lambda x,y: x == 'stopped' or y == 'stopped', - lambda x,y: x == 'connected' and y == 'connected', + lambda x,y: x == 'follow' and y == 'follow', ] ) check_replication([master, replica], '1') @@ -82,7 +82,7 @@ parallel_run( 'box.space.test:insert{20, 2}', [ lambda x,y: x == 'stopped' or y == 'stopped', - lambda x,y: x == 'connected' and y == 'connected', + lambda x,y: x == 'follow' and y == 'follow', ] ) @@ -91,7 +91,7 @@ prepare_cluster() parallel_run( "box.space.test:update(2, {{'=', 2, 1}})", "box.space.test:update(2, {{'=', 2, 2}})", - [lambda x,y: x == 'connected' and y == 'connected',] + [lambda x,y: x == 'follow' and y == 'follow',] ) # test4: CRDT increment with update @@ -99,7 +99,7 @@ prepare_cluster() parallel_run( "box.space.test:update(1, {{'+', 2, 1}})", "box.space.test:update(1, {{'+', 2, 2}})", - [lambda x,y: x == 'connected' and y == 'connected',] + [lambda x,y: x == 'follow' and y == 'follow',] ) check_replication([master, replica], '1') @@ -108,7 +108,7 @@ prepare_cluster() parallel_run( "box.space.test:delete(999)", "box.space.test:delete(999)", - [lambda x,y: x == 'connected' and y == 'connected',] + [lambda x,y: x == 'follow' and y == 'follow',] ) check_replication([master, replica])