From 00a7a4046bb610536fd4e905a10a3dae637c822e Mon Sep 17 00:00:00 2001 From: Roman Tsisyk <roman@tsisyk.com> Date: Tue, 25 Aug 2015 15:01:15 +0300 Subject: [PATCH] Rename `struct remote' to `struct replica' No semantical code chagnes. --- src/box/box.cc | 22 ++++---- src/box/lua/info.cc | 11 ++-- src/box/recovery.cc | 6 +-- src/box/recovery.h | 24 +-------- src/box/replica.cc | 129 ++++++++++++++++++++++---------------------- src/box/replica.h | 35 ++++++++++-- 6 files changed, 118 insertions(+), 109 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 9f872196df..6722e32f4d 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -166,25 +166,25 @@ extern "C" void box_set_replication_source(void) { const char *source = cfg_gets("replication_source"); - bool old_is_replica = recovery->remote.reader; + bool old_is_replica = recovery->replica.reader; bool new_is_replica = source != NULL; if (old_is_replica != new_is_replica || (old_is_replica && - (strcmp(source, recovery->remote.source) != 0))) { + (strcmp(source, recovery->replica.source) != 0))) { if (recovery->writer) { if (old_is_replica) - recovery_stop_remote(recovery); - recovery_set_remote(recovery, source); - if (recovery_has_remote(recovery)) - recovery_follow_remote(recovery); + recovery_stop_replica(recovery); + recovery_set_replica(recovery, source); + if (recovery_has_replica(recovery)) + recovery_follow_replica(recovery); } else { /* * Do nothing, we're in local hot * standby mode, the server * will automatically begin following - * the remote when local hot standby + * the replica when local hot standby * mode is finished, see * box_leave_local_hot_standby_mode() */ @@ -635,7 +635,7 @@ box_init(void) recovery = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), recover_row, NULL); - recovery_set_remote(recovery, + recovery_set_replica(recovery, cfg_gets("replication_source")); recovery_setup_panic(recovery, cfg_geti("panic_on_snap_error"), @@ -647,7 +647,7 @@ box_init(void) int64_t checkpoint_id = recovery_last_checkpoint(recovery); engine_recover_to_checkpoint(checkpoint_id); - } else if (recovery_has_remote(recovery)) { + } else if (recovery_has_replica(recovery)) { /* Initialize a new replica */ engine_begin_join(); replica_bootstrap(recovery); @@ -680,8 +680,8 @@ box_init(void) stat_cleanup(stat_base, IPROTO_TYPE_STAT_MAX); - if (recovery_has_remote(recovery)) - recovery_follow_remote(recovery); + if (recovery_has_replica(recovery)) + recovery_follow_replica(recovery); /* Enter read-write mode. */ if (recovery->server_id > 0) box_set_ro(false); diff --git a/src/box/lua/info.cc b/src/box/lua/info.cc index ad3a654154..3628417960 100644 --- a/src/box/lua/info.cc +++ b/src/box/lua/info.cc @@ -49,23 +49,24 @@ static int lbox_info_replication(struct lua_State *L) { struct recovery_state *r = recovery; + struct replica *replica = &r->replica; lua_newtable(L); lua_pushstring(L, "status"); - lua_pushstring(L, r->remote.status); + lua_pushstring(L, replica->status); lua_settable(L, -3); - if (r->remote.reader) { + if (replica->reader) { lua_pushstring(L, "lag"); - lua_pushnumber(L, r->remote.lag); + lua_pushnumber(L, replica->lag); lua_settable(L, -3); lua_pushstring(L, "idle"); - lua_pushnumber(L, ev_now(loop()) - r->remote.last_row_time); + lua_pushnumber(L, ev_now(loop()) - replica->last_row_time); lua_settable(L, -3); - Exception *e = diag_last_error(&r->remote.reader->diag); + Exception *e = diag_last_error(&replica->reader->diag); if (e != NULL) { lua_pushstring(L, "message"); lua_pushstring(L, e->errmsg()); diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 05e70d4a73..ae76f2a4e6 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -91,8 +91,8 @@ * IR -> HS # recovery_follow_local() * IRR -> RR # recovery_follow_local() * HS -> M # recovery_finalize() - * M -> R # recovery_follow_remote() - * R -> M # recovery_stop_remote() + * M -> R # recovery_follow_replica() + * R -> M # recovery_stop_replica() */ /* {{{ LSN API */ @@ -172,7 +172,7 @@ recovery_new(const char *snap_dirname, const char *wal_dirname, xdir_check(&r->wal_dir); r->watcher = NULL; - recovery_init_remote(r); + recovery_init_replica(r); guard.is_active = false; return r; diff --git a/src/box/recovery.h b/src/box/recovery.h index a993a38b0f..134ca1d654 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -30,16 +30,13 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ -#include <netinet/in.h> -#include <sys/socket.h> - #include "trivia/util.h" #include "third_party/tarantool_ev.h" #include "xlog.h" #include "vclock.h" #include "tt_uuid.h" -#include "uri.h" #include "wal.h" +#include "replica.h" #if defined(__cplusplus) extern "C" { @@ -57,23 +54,6 @@ typedef void (apply_row_f)(struct recovery_state *, void *, struct wal_watcher; struct wal_writer; -enum { REMOTE_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ - -/** State of a replication connection to the master */ -struct remote { - struct fiber *reader; - const char *status; - ev_tstamp lag, last_row_time; - bool warning_said; - char source[REMOTE_SOURCE_MAXLEN]; - struct uri uri; - union { - struct sockaddr addr; - struct sockaddr_storage addrstorage; - }; - socklen_t addr_len; -}; - struct recovery_state { struct vclock vclock; /** The WAL we're currently reading/writing from/to. */ @@ -87,7 +67,7 @@ struct recovery_state { * locally or send to the replica. */ struct fiber *watcher; - struct remote remote; + struct replica replica; /** * apply_row is a module callback invoked during initial * recovery and when reading rows from the master. diff --git a/src/box/replica.cc b/src/box/replica.cc index 4d90b2b669..40626621bb 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -45,8 +45,8 @@ static const int RECONNECT_DELAY = 1.0; static void -remote_read_row(struct ev_io *coio, struct iobuf *iobuf, - struct xrow_header *row) +replica_read_row(struct ev_io *coio, struct iobuf *iobuf, + struct xrow_header *row) { struct ibuf *in = &iobuf->in; @@ -74,7 +74,7 @@ remote_read_row(struct ev_io *coio, struct iobuf *iobuf, } static void -remote_write_row(struct ev_io *coio, const struct xrow_header *row) +replica_write_row(struct ev_io *coio, const struct xrow_header *row) { struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec(row, iov); @@ -82,32 +82,32 @@ remote_write_row(struct ev_io *coio, const struct xrow_header *row) } static void -remote_connect(struct recovery_state *r, struct ev_io *coio, - struct iobuf *iobuf) +replica_connect(struct recovery_state *r, struct ev_io *coio, + struct iobuf *iobuf) { char greeting[IPROTO_GREETING_SIZE]; - struct remote *remote = &r->remote; - struct uri *uri = &r->remote.uri; + struct replica *replica = &r->replica; + struct uri *uri = &replica->uri; /* - * coio_connect() stores resolved address to \a &remote->addr - * on success. &remote->addr_len is a value-result argument which + * coio_connect() stores resolved address to \a &replica->addr + * on success. &replica->addr_len is a value-result argument which * must be initialized to the size of associated buffer (addrstorage) * before calling coio_connect(). Since coio_connect() performs * DNS resolution under the hood it is theoretically possible that - * remote->addr_len will be different even for same uri. + * replica->addr_len will be different even for same uri. */ - remote->addr_len = sizeof(remote->addrstorage); + replica->addr_len = sizeof(replica->addrstorage); /* Prepare null-terminated strings for coio_connect() */ - coio_connect(coio, uri, &remote->addr, &remote->addr_len); + coio_connect(coio, uri, &replica->addr, &replica->addr_len); assert(coio->fd >= 0); coio_readn(coio, greeting, sizeof(greeting)); - say_crit("connected to %s", sio_strfaddr(&remote->addr, - remote->addr_len)); + say_crit("connected to %s", sio_strfaddr(&replica->addr, + replica->addr_len)); /* Perform authentication if user provided at least login */ - if (!r->remote.uri.login) + if (!replica->uri.login) return; /* Authenticate */ @@ -116,8 +116,8 @@ remote_connect(struct recovery_state *r, struct ev_io *coio, xrow_encode_auth(&row, greeting, uri->login, uri->login_len, uri->password, uri->password_len); - remote_write_row(coio, &row); - remote_read_row(coio, iobuf, &row); + replica_write_row(coio, &row); + replica_read_row(coio, iobuf, &row); if (row.type != IPROTO_OK) xrow_decode_error(&row); /* auth failed */ @@ -129,7 +129,8 @@ void replica_bootstrap(struct recovery_state *r) { say_info("bootstrapping a replica"); - assert(recovery_has_remote(r)); + struct replica *replica = &r->replica; + assert(recovery_has_replica(r)); /* Generate Server-UUID */ tt_uuid_create(&r->server_uuid); @@ -144,18 +145,18 @@ replica_bootstrap(struct recovery_state *r) for (;;) { try { - remote_connect(r, &coio, iobuf); - r->remote.warning_said = false; + replica_connect(r, &coio, iobuf); + replica->warning_said = false; break; } catch (FiberCancelException *e) { throw; } catch (Exception *e) { - if (! r->remote.warning_said) { + if (! replica->warning_said) { say_error("can't connect to master"); e->log(); say_info("will retry every %i second", RECONNECT_DELAY); - r->remote.warning_said = true; + replica->warning_said = true; } iobuf_reset(iobuf); evio_close(loop(), &coio); @@ -166,13 +167,13 @@ replica_bootstrap(struct recovery_state *r) /* Send JOIN request */ struct xrow_header row; xrow_encode_join(&row, &r->server_uuid); - remote_write_row(&coio, &row); + replica_write_row(&coio, &row); /* Add a surrogate server id for snapshot rows */ vclock_add_server(&r->vclock, 0); while (true) { - remote_read_row(&coio, iobuf, &row); + replica_read_row(&coio, iobuf, &row); if (row.type == IPROTO_OK) { /* End of stream */ @@ -199,15 +200,16 @@ replica_bootstrap(struct recovery_state *r) } static void -remote_set_status(struct remote *remote, const char *status) +replica_set_status(struct replica *replica, const char *status) { - remote->status = status; + replica->status = status; } static void -pull_from_remote(va_list ap) +pull_from_replica(va_list ap) { struct recovery_state *r = va_arg(ap, struct recovery_state *); + struct replica *replica = &r->replica; struct ev_io coio; struct iobuf *iobuf = iobuf_new(); ev_loop *loop = loop(); @@ -224,16 +226,16 @@ pull_from_remote(va_list ap) try { struct xrow_header row; if (! evio_has_fd(&coio)) { - remote_set_status(&r->remote, "connecting"); + replica_set_status(replica, "connecting"); err = "can't connect to master"; - remote_connect(r, &coio, iobuf); + replica_connect(r, &coio, iobuf); /* Send SUBSCRIBE request */ err = "can't subscribe to master"; xrow_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock); - remote_write_row(&coio, &row); - r->remote.warning_said = false; - remote_set_status(&r->remote, "connected"); + replica_write_row(&coio, &row); + replica->warning_said = false; + replica_set_status(replica, "connected"); } err = "can't read row"; /** @@ -243,11 +245,10 @@ pull_from_remote(va_list ap) * "OK" response, but a stream of rows. * from the binary log. */ - remote_read_row(&coio, iobuf, &row); + replica_read_row(&coio, iobuf, &row); err = NULL; - r->remote.lag = ev_now(loop) - row.tm; - r->remote.last_row_time = - ev_now(loop); + replica->lag = ev_now(loop) - row.tm; + replica->last_row_time = ev_now(loop); if (iproto_type_is_error(row.type)) xrow_decode_error(&row); /* error */ @@ -256,20 +257,20 @@ pull_from_remote(va_list ap) iobuf_reset(iobuf); fiber_gc(); } catch (ClientError *e) { - remote_set_status(&r->remote, "stopped"); + replica_set_status(replica, "stopped"); throw; } catch (FiberCancelException *e) { - remote_set_status(&r->remote, "off"); + replica_set_status(replica, "off"); throw; } catch (Exception *e) { - remote_set_status(&r->remote, "disconnected"); - if (! r->remote.warning_said) { + replica_set_status(replica, "disconnected"); + if (!replica->warning_said) { if (err != NULL) say_info("%s", err); e->log(); say_info("will retry every %i second", RECONNECT_DELAY); - r->remote.warning_said = true; + replica->warning_said = true; } evio_close(loop, &coio); } @@ -293,70 +294,72 @@ pull_from_remote(va_list ap) } void -recovery_follow_remote(struct recovery_state *r) +recovery_follow_replica(struct recovery_state *r) { char name[FIBER_NAME_MAX]; + struct replica *replica = &r->replica; - assert(r->remote.reader == NULL); - assert(recovery_has_remote(r)); + assert(replica->reader == NULL); + assert(recovery_has_replica(r)); - const char *uri = uri_format(&r->remote.uri); + const char *uri = uri_format(&replica->uri); say_crit("starting replication from %s", uri); snprintf(name, sizeof(name), "replica/%s", uri); - struct fiber *f = fiber_new(name, pull_from_remote); + struct fiber *f = fiber_new(name, pull_from_replica); /** * So that we can safely grab the status of the * fiber any time we want. */ fiber_set_joinable(f, true); - r->remote.reader = f; + replica->reader = f; fiber_start(f, r); } void -recovery_stop_remote(struct recovery_state *r) +recovery_stop_replica(struct recovery_state *r) { say_info("shutting down the replica"); - struct fiber *f = r->remote.reader; - r->remote.reader = NULL; + struct replica *replica = &r->replica; + struct fiber *f = replica->reader; + replica->reader = NULL; fiber_cancel(f); /** - * If the remote died from an exception, don't throw it + * If the replica died from an exception, don't throw it * up. */ diag_clear(&f->diag); fiber_join(f); - r->remote.status = "off"; + replica->status = "off"; } void -recovery_set_remote(struct recovery_state *r, const char *uri) +recovery_set_replica(struct recovery_state *r, const char *uri) { /* First, stop the reader, then set the source */ - assert(r->remote.reader == NULL); + struct replica *replica = &r->replica; + assert(replica->reader == NULL); if (uri == NULL) { - r->remote.source[0] = '\0'; + replica->source[0] = '\0'; return; } - snprintf(r->remote.source, sizeof(r->remote.source), "%s", uri); - struct remote *remote = &r->remote; - int rc = uri_parse(&remote->uri, r->remote.source); + snprintf(replica->source, sizeof(replica->source), "%s", uri); + int rc = uri_parse(&replica->uri, replica->source); /* URI checked by box_check_replication_source() */ - assert(rc == 0 && remote->uri.service != NULL); + assert(rc == 0 && replica->uri.service != NULL); (void) rc; } bool -recovery_has_remote(struct recovery_state *r) +recovery_has_replica(struct recovery_state *r) { - return r->remote.source[0]; + return r->replica.source[0]; } void -recovery_init_remote(struct recovery_state *r) +recovery_init_replica(struct recovery_state *r) { - r->remote.status = "off"; + r->replica.status = "off"; } diff --git a/src/box/replica.h b/src/box/replica.h index 78dc87b660..c07ada85f7 100644 --- a/src/box/replica.h +++ b/src/box/replica.h @@ -31,7 +31,32 @@ * SUCH DAMAGE. */ +#include <netinet/in.h> +#include <sys/socket.h> + +#include "trivia/util.h" +#include "uri.h" +#include "third_party/tarantool_ev.h" + struct recovery_state; + +enum { REPLICA_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */ + +/** State of a replication connection to the master */ +struct replica { + struct fiber *reader; + const char *status; + ev_tstamp lag, last_row_time; + bool warning_said; + char source[REPLICA_SOURCE_MAXLEN]; + struct uri uri; + union { + struct sockaddr addr; + struct sockaddr_storage addrstorage; + }; + socklen_t addr_len; +}; + /** Connect to a master and request a snapshot. * Raises an exception on error. * @@ -42,18 +67,18 @@ void replica_bootstrap(struct recovery_state *r); void -recovery_follow_remote(struct recovery_state *r); +recovery_follow_replica(struct recovery_state *r); void -recovery_stop_remote(struct recovery_state *r); +recovery_stop_replica(struct recovery_state *r); void -recovery_set_remote(struct recovery_state *r, const char *source); +recovery_set_replica(struct recovery_state *r, const char *source); bool -recovery_has_remote(struct recovery_state *r); +recovery_has_replica(struct recovery_state *r); void -recovery_init_remote(struct recovery_state *r); +recovery_init_replica(struct recovery_state *r); #endif /* TARANTOOL_REPLICA_H_INCLUDED */ -- GitLab