Skip to content
Snippets Groups Projects
Commit ef45ecb9 authored by Roman Tsisyk's avatar Roman Tsisyk
Browse files

Add replicaset - a set for replicas by replication source.

Prepare for multi-master.
parent 80931b7b
No related branches found
No related tags found
No related merge requests found
......@@ -206,10 +206,13 @@ box_set_replication_source(void)
const char *source = cfg_gets("replication_source");
/* This hook is only invoked if source has changed */
if (replica != NULL) {
struct replica *replica = cluster_replica_first();
while (replica != NULL) {
struct replica *next = cluster_replica_next(replica);
replica_stop(replica); /* cancels a background fiber */
cluster_del_replica(replica);
replica_delete(replica);
replica = NULL;
replica = next; /* safe iteration with cluster_del_replica */
}
if (source == NULL)
......@@ -217,6 +220,7 @@ box_set_replication_source(void)
/* Start a new replication client using provided URI */
replica = replica_new(source);
cluster_add_replica(replica);
replica_start(replica, recovery); /* starts a background fiber */
}
......@@ -611,6 +615,7 @@ box_free(void)
*/
if (box_init_done) {
session_free();
cluster_free();
user_cache_free();
schema_free();
tuple_free();
......@@ -682,6 +687,8 @@ box_init(void)
*/
session_init();
cluster_init();
title("loading", NULL);
/* recovery initialization */
......@@ -692,6 +699,10 @@ box_init(void)
cfg_geti("panic_on_snap_error"),
cfg_geti("panic_on_wal_error"));
const char *source = cfg_gets("replication_source");
if (source != NULL) {
struct replica *replica = replica_new(source);
cluster_add_replica(replica);
}
if (recovery_has_data(recovery)) {
/* Tell Sophia engine LSN it must recover to. */
......@@ -708,8 +719,8 @@ box_init(void)
/* Add a surrogate server id for snapshot rows */
vclock_add_server(&recovery->vclock, 0);
/* Bootstrap from a remote master */
replica = replica_new(source);
/* Bootstrap from the first master */
struct replica *replica = cluster_replica_first();
replica_start(replica, recovery);
replica_wait(replica); /* throws on failure */
......@@ -747,9 +758,7 @@ box_init(void)
rmean_cleanup(rmean_box);
if (source != NULL) {
if (replica == NULL)
replica = replica_new(source);
cluster_foreach_replica(replica) {
/* Follow replica */
assert(recovery->writer);
replica_start(replica, recovery);
......
......@@ -39,7 +39,31 @@
*/
tt_uuid cluster_id;
struct replica *replica;
typedef rb_tree(struct replica) replicaset_t;
rb_proto(, replicaset_, replicaset_t, struct replica)
static int
replica_compare_by_source(const struct replica *a, const struct replica *b)
{
return strcmp(a->source, b->source);
}
rb_gen(, replicaset_, replicaset_t, struct replica, link,
replica_compare_by_source);
static replicaset_t replicaset; /* zeroed by linker */
void
cluster_init(void)
{
replicaset_new(&replicaset);
}
void
cluster_free(void)
{
}
extern "C" struct vclock *
cluster_clock()
......@@ -92,3 +116,35 @@ cluster_del_server(uint32_t server_id)
box_set_ro(true);
}
}
void
cluster_add_replica(struct replica *replica)
{
replicaset_insert(&replicaset, replica);
}
void
cluster_del_replica(struct replica *replica)
{
replicaset_remove(&replicaset, replica);
}
struct replica *
cluster_find_replica(const char *source)
{
struct replica key;
snprintf(key.source, sizeof(key.source), "%s", source);
return replicaset_search(&replicaset, &key);
}
struct replica *
cluster_replica_first(void)
{
return replicaset_first(&replicaset);
}
struct replica *
cluster_replica_next(struct replica *replica)
{
return replicaset_next(&replicaset, replica);
}
......@@ -79,6 +79,12 @@
* and is implemented in @file vclock.h
*/
void
cluster_init(void);
void
cluster_free(void);
/** {{{ Global cluster identifier API **/
/** UUID of the cluster. */
......@@ -87,8 +93,6 @@ extern tt_uuid cluster_id;
extern "C" struct vclock *
cluster_clock();
extern struct replica *replica;
/* }}} */
/** {{{ Cluster server id API **/
......@@ -114,4 +118,26 @@ cluster_del_server(uint32_t server_id);
/** }}} **/
/** {{{ Cluster replica API **/
void
cluster_add_replica(struct replica *replica);
void
cluster_del_replica(struct replica *replica);
struct replica *
cluster_find_replica(const char *source);
struct replica *
cluster_replica_first(void);
struct replica *
cluster_replica_next(struct replica *replica);
#define cluster_foreach_replica(var) \
for (struct replica *var = cluster_replica_first(); \
var != NULL; var = cluster_replica_next(var))
/** }}} **/
#endif
......@@ -52,6 +52,7 @@ lbox_info_replication(struct lua_State *L)
{
lua_newtable(L);
struct replica *replica = cluster_replica_first();
if (replica == NULL) {
lua_pushstring(L, "status");
lua_pushstring(L, "off");
......
......@@ -37,6 +37,8 @@
#include "trivia/util.h"
#include "uri.h"
#include "third_party/tarantool_ev.h"
#define RB_COMPACT 1
#include <third_party/rb.h>
struct recovery_state;
......@@ -65,6 +67,7 @@ struct replica {
ev_tstamp lag, last_row_time;
bool warning_said;
char source[REPLICA_SOURCE_MAXLEN];
rb_node(struct replica) link; /* a set by source in cluster.cc */
struct uri uri;
union {
struct sockaddr addr;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment