Skip to content
Snippets Groups Projects
Commit 09d4e13d authored by Konstantin Shulgin's avatar Konstantin Shulgin
Browse files

Implement blueprint: 'feature-feeder-in-core'.

Missed inotify even in the replicaor handler process was foxed.
replicaor_init was splited on two function:
  - replicaor_prefork -- create replicaor spawner process;
  - replicaor_init -- initialize replicaor fibers in the main tarantool
	                  process.

Test and test configuration files in box_replication soute was
clenupped and updated.
parent a5318c65
No related branches found
No related tags found
No related merge requests found
...@@ -52,17 +52,12 @@ struct replicator_process { ...@@ -52,17 +52,12 @@ struct replicator_process {
u32 child_count; u32 child_count;
}; };
static int replicator_socks[2];
/*-----------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------*/
/* replication accept/sender fibers */ /* replication accept/sender fibers */
/*-----------------------------------------------------------------------------*/ /*-----------------------------------------------------------------------------*/
/**
* Initialize replication fibers.
*/
static void
fibers_init(int sock);
/** /**
* Replication acceptor fiber handler. * Replication acceptor fiber handler.
*/ */
...@@ -201,13 +196,10 @@ void ...@@ -201,13 +196,10 @@ void
replicator_reload_config(struct tarantool_cfg *config __attribute__((unused))) replicator_reload_config(struct tarantool_cfg *config __attribute__((unused)))
{} {}
/** Intialize tarantool's replicator module. */ /** Pre-fork replicator spawner process. */
void void
replicator_init(void) replicator_prefork()
{ {
int socks[2];
pid_t pid = -1;
if (cfg.replication_port == 0) { if (cfg.replication_port == 0) {
/* replicator not needed, leave init function */ /* replicator not needed, leave init function */
return; return;
...@@ -218,35 +210,29 @@ replicator_init(void) ...@@ -218,35 +210,29 @@ replicator_init(void)
} }
/* create communication sockes between tarantool and replicator processes via UNIX sockets*/ /* create communication sockes between tarantool and replicator processes via UNIX sockets*/
if (socketpair(PF_LOCAL, SOCK_STREAM, 0, socks) != 0) { if (socketpair(PF_LOCAL, SOCK_STREAM, 0, replicator_socks) != 0) {
panic_syserror("socketpair"); panic_syserror("socketpair");
} }
/* create replicator process */ /* create replicator process */
pid = fork(); pid_t pid = fork();
if (pid == -1) { if (pid == -1) {
panic("fork"); panic_syserror("fork");
} }
if (pid != 0) { if (pid != 0) {
/* parent process: tarantool */ /* parent process: tarantool */
close(socks[1]); close(replicator_socks[1]);
fibers_init(socks[0]);
} else { } else {
/* child process: replicator */ /* child process: replicator */
close(socks[0]); close(replicator_socks[0]);
spawner_init(socks[1]); spawner_init(replicator_socks[1]);
} }
} }
/** Intialize tarantool's replicator module. */
/*-----------------------------------------------------------------------------*/ void
/* replication accept/sender fibers */ replicator_init()
/*-----------------------------------------------------------------------------*/
/** Initialize replication fibers. */
static void
fibers_init(int sock)
{ {
char fiber_name[FIBER_NAME_MAXLEN]; char fiber_name[FIBER_NAME_MAXLEN];
const size_t sender_inbox_size = 16 * sizeof(int); const size_t sender_inbox_size = 16 * sizeof(int);
...@@ -258,7 +244,7 @@ fibers_init(int sock) ...@@ -258,7 +244,7 @@ fibers_init(int sock)
panic("snprintf fail"); panic("snprintf fail");
} }
sender = fiber_create(fiber_name, sock, sender_inbox_size, sender_handler, NULL); sender = fiber_create(fiber_name, replicator_socks[0], sender_inbox_size, sender_handler, NULL);
if (sender == NULL) { if (sender == NULL) {
panic("create fiber fail"); panic("create fiber fail");
} }
...@@ -277,6 +263,11 @@ fibers_init(int sock) ...@@ -277,6 +263,11 @@ fibers_init(int sock)
fiber_call(sender); fiber_call(sender);
} }
/*-----------------------------------------------------------------------------*/
/* replication accept/sender fibers */
/*-----------------------------------------------------------------------------*/
/** Replication acceptor fiber handler. */ /** Replication acceptor fiber handler. */
static void static void
acceptor_handler(void *data) acceptor_handler(void *data)
......
...@@ -539,6 +539,9 @@ main(int argc, char **argv) ...@@ -539,6 +539,9 @@ main(int argc, char **argv)
say_logger_init(cfg.logger_nonblock); say_logger_init(cfg.logger_nonblock);
booting = false; booting = false;
initialize(cfg.slab_alloc_arena, cfg.slab_alloc_minimal, cfg.slab_alloc_factor);
replicator_prefork();
ev_default_loop(EVFLAG_AUTO); ev_default_loop(EVFLAG_AUTO);
ev_signal *ev_sig; ev_signal *ev_sig;
...@@ -546,7 +549,6 @@ main(int argc, char **argv) ...@@ -546,7 +549,6 @@ main(int argc, char **argv)
ev_signal_init(ev_sig, (void *)snapshot, SIGUSR1); ev_signal_init(ev_sig, (void *)snapshot, SIGUSR1);
ev_signal_start(ev_sig); ev_signal_start(ev_sig);
initialize(cfg.slab_alloc_arena, cfg.slab_alloc_minimal, cfg.slab_alloc_factor);
signal_init(); signal_init();
replicator_init(); replicator_init();
......
...@@ -46,13 +46,19 @@ replicator_check_config(struct tarantool_cfg *config); ...@@ -46,13 +46,19 @@ replicator_check_config(struct tarantool_cfg *config);
void void
replicator_reload_config(struct tarantool_cfg *config); replicator_reload_config(struct tarantool_cfg *config);
/**
* Pre-fork replicator spawner process.
*/
void
replicator_prefork();
/** /**
* Intialize tarantool's replicator module. * Intialize tarantool's replicator module.
* *
* @return On success, a zero is returned. On error, -1 is returned. * @return On success, a zero is returned. On error, -1 is returned.
*/ */
void void
replicator_init(void); replicator_init();
#endif // !defined(REPLICATOR_H_INCLUDED) #endif // !defined(REPLICATOR_H_INCLUDED)
pid_file = "tarantool.pid" pid_file = "tarantool.pid"
logger="tee -a tarantool.log" logger="cat - >> tarantool.log"
logger_nonblock=0
log_level = 5
bind_ipaddr="INADDR_ANY" bind_ipaddr="INADDR_ANY"
......
pid_file = "tarantool.pid" pid_file = "tarantool.pid"
logger="tee -a tarantool.log" logger="cat - >> tarantool.log"
logger_nonblock=0
log_level = 5 log_level = 5
bind_ipaddr="INADDR_ANY" bind_ipaddr="INADDR_ANY"
...@@ -16,3 +17,4 @@ namespace[0].index[0].type = "HASH" ...@@ -16,3 +17,4 @@ namespace[0].index[0].type = "HASH"
namespace[0].index[0].unique = 1 namespace[0].index[0].unique = 1
namespace[0].index[0].key_field[0].fieldno = 0 namespace[0].index[0].key_field[0].fieldno = 0
namespace[0].index[0].key_field[0].type = "NUM" namespace[0].index[0].key_field[0].type = "NUM"
pid_file = "tarantool.pid" pid_file = "tarantool.pid"
logger="tee -a tarantool.log" logger="cat - >> tarantool.log"
logger_nonblock=0
log_level = 5 log_level = 5
bind_ipaddr="INADDR_ANY" bind_ipaddr="INADDR_ANY"
......
pid_file = "tarantool.pid" pid_file = "tarantool.pid"
logger="tee -a tarantool.log" logger="cat - >> tarantool.log"
logger_nonblock=0
log_level = 5 log_level = 5
bind_ipaddr="INADDR_ANY" bind_ipaddr="INADDR_ANY"
......
pid_file = "tarantool.pid" pid_file = "tarantool.pid"
logger="tee -a tarantool.log" logger="cat - >> tarantool.log"
logger_nonblock=0
log_level = 5 log_level = 5
bind_ipaddr="INADDR_ANY" bind_ipaddr="INADDR_ANY"
......
...@@ -38,7 +38,7 @@ for i in range(id, id + 10): ...@@ -38,7 +38,7 @@ for i in range(id, id + 10):
print """ print """
# Select 10 tuples from replica # Select 10 tuples from replica
""" """
replica.wait_lsn(10) replica.wait_lsn(11)
for i in range(id, id + 10): for i in range(id, id + 10):
replica.sql.execute("select * from t0 where k0 = %d" % i, silent=False) replica.sql.execute("select * from t0 where k0 = %d" % i, silent=False)
...@@ -70,7 +70,7 @@ for i in range(id, id + 10): ...@@ -70,7 +70,7 @@ for i in range(id, id + 10):
print """ print """
# Select 10 tuples from replica # Select 10 tuples from replica
""" """
replica.wait_lsn(20) replica.wait_lsn(21)
for i in range(id, id + 10): for i in range(id, id + 10):
replica.sql.execute("select * from t0 where k0 = %d" % i, silent=False) replica.sql.execute("select * from t0 where k0 = %d" % i, silent=False)
......
This diff is collapsed.
# encoding: tarantool # encoding: tarantool
import os import os
import time
from lib.tarantool_box_server import TarantoolBoxServer from lib.tarantool_box_server import TarantoolBoxServer
REPEAT = 10 REPEAT = 10
ID_BEGIN = 0 ID_BEGIN = 0
ID_STEP = 10 ID_STEP = 5
def insert_tuples(server, begin, end, msg = "tuple"): def insert_tuples(server, begin, end, msg = "tuple"):
for i in range(begin, end): for i in range(begin, end):
...@@ -27,7 +28,6 @@ replica.deploy("box_replication/cfg/replica.cfg", ...@@ -27,7 +28,6 @@ replica.deploy("box_replication/cfg/replica.cfg",
id = ID_BEGIN id = ID_BEGIN
for i in range(REPEAT): for i in range(REPEAT):
summary = 0.0
print "test %d iteration" % i print "test %d iteration" % i
# insert to master # insert to master
...@@ -36,24 +36,35 @@ for i in range(REPEAT): ...@@ -36,24 +36,35 @@ for i in range(REPEAT):
select_tuples(replica, id, id + ID_STEP) select_tuples(replica, id, id + ID_STEP)
id += ID_STEP id += ID_STEP
# insert to master
insert_tuples(master, id, id + ID_STEP)
# select from replica
select_tuples(replica, id, id + ID_STEP)
id += ID_STEP
print "swap servers" print "swap servers"
# reconfigure replica to master # reconfigure replica to master
replica.reconfigure("box_replication/cfg/replica_to_master.cfg", silent = False) replica.reconfigure("box_replication/cfg/replica_to_master.cfg", silent = False)
# reconfigure master to replica # reconfigure master to replica
master.reconfigure("box_replication/cfg/master_to_replica.cfg", silent = False) master.reconfigure("box_replication/cfg/master_to_replica.cfg", silent = False)
# insert to replica
insert_tuples(replica, id, id + ID_STEP)
# select from master
select_tuples(master, id, id + ID_STEP)
id += ID_STEP
# insert to master # insert to replica
insert_tuples(replica, id, id + ID_STEP) insert_tuples(replica, id, id + ID_STEP)
# select from replica # select from master
select_tuples(master, id, id + ID_STEP) select_tuples(master, id, id + ID_STEP)
id += ID_STEP
print "rollback servers configuration" print "rollback servers configuration"
# reconfigure replica to master # reconfigure replica to master
master.reconfigure("box_replication/cfg/master.cfg", silent = False) master.reconfigure("box_replication/cfg/master.cfg", silent = False)
# reconfigure master to replica # reconfigure master to replica
replica.reconfigure("box_replication/cfg/replica.cfg", silent = False) replica.reconfigure("box_replication/cfg/replica.cfg", silent = False)
id += ID_STEP
# Cleanup. # Cleanup.
......
...@@ -41,10 +41,9 @@ class TarantoolBoxServer(TarantoolServer): ...@@ -41,10 +41,9 @@ class TarantoolBoxServer(TarantoolServer):
return info[param] return info[param]
def wait_lsn(self, lsn): def wait_lsn(self, lsn):
try_count = 0
while True: while True:
curr_lsn = int(self.get_param("lsn")) curr_lsn = int(self.get_param("lsn"))
if (curr_lsn >= lsn): if (curr_lsn >= lsn):
break break
try_count += 1
time.sleep(0.01) time.sleep(0.01)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment