diff --git a/src/box/box.cc b/src/box/box.cc index fbe75d450ff8d29086a08ff46dca689c00b996ed..5f079c7e811580fecec6fc870c593077dc56f2ba 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -58,6 +58,8 @@ static void process_ro(struct port *port, struct request *request); static void process_rw(struct port *port, struct request *request); box_process_func box_process = process_ro; +struct recovery_state *recovery; + static int stat_base; int snapshot_pid = 0; /* snapshot processes pid */ @@ -107,8 +109,11 @@ box_is_ro(void) } static void -recover_row(void *param __attribute__((unused)), struct xrow_header *row) +recover_row(struct recovery_state *r, void *param, struct xrow_header *row) { + (void) param; + (void) r; + assert(r == recovery); assert(row->bodycnt == 1); /* always 1 for read */ struct request request; request_create(&request, row->type); @@ -324,6 +329,26 @@ box_on_cluster_join(const tt_uuid *server_uuid) (unsigned) server_id, tt_uuid_str(server_uuid)); } +void +box_process_join(struct xrow_header *header) +{ + assert(header->type == IPROTO_JOIN); + struct tt_uuid server_uuid = uuid_nil; + xrow_decode_join(header, &server_uuid); + + box_on_cluster_join(&server_uuid); + + /* process JOIN request via replication relay */ + replication_join(session()->fd, header); +} + +void +box_process_subscribe(struct xrow_header *header) +{ + /* process SUBSCRIBE request via replication relay */ + replication_subscribe(session()->fd, header); +} + /** Replace the current server id in _cluster */ static void box_set_server_uuid() @@ -356,10 +381,13 @@ box_set_cluster_uuid() void box_free(void) { + if (recovery == NULL) + return; user_cache_free(); schema_free(); tuple_free(); - recovery_free(); + recovery_delete(recovery); + recovery = NULL; engine_shutdown(); stat_free(); session_free(); @@ -396,9 +424,8 @@ box_init() user_cache_init(); /* recovery initialization */ - recovery_init(cfg_gets("snap_dir"), cfg_gets("wal_dir"), - recover_row, NULL, box_snapshot_cb, box_on_cluster_join, - cfg_geti("rows_per_wal")); + recovery = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), + recover_row, NULL, box_snapshot_cb, cfg_geti("rows_per_wal")); recovery_set_remote(recovery, cfg_gets("replication_source")); recovery_update_io_rate_limit(recovery, cfg_getd("snap_io_rate_limit")); @@ -450,6 +477,15 @@ box_init() iobuf_set_readahead(cfg_geti("readahead")); } + +void +box_atfork() +{ + if (recovery == NULL) + return; + recovery_atfork(recovery); +} + static void snapshot_write_tuple(struct log_io *l, uint32_t n, struct tuple *tuple) @@ -470,7 +506,7 @@ snapshot_write_tuple(struct log_io *l, row.body[0].iov_len = sizeof(body); row.body[1].iov_base = tuple->data; row.body[1].iov_len = tuple->bsize; - snapshot_write_row(l, &row); + snapshot_write_row(recovery, l, &row); } static void diff --git a/src/box/box.h b/src/box/box.h index 3658142c85d13197c062b13f615177b5258b6b1b..a2ebc2f28912092ab442fe9d7ac43e815ab01b23 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -52,6 +52,10 @@ void box_init(); /** To be called at program end. */ void box_free(void); +/** A pthread_atfork() callback for box */ +void +box_atfork(); + /** * The main entry point to the * Box: callbacks into the request processor. @@ -96,6 +100,12 @@ const char *box_status(void); void box_leave_local_standby_mode(void *data __attribute__((unused))); +void +box_process_join(struct xrow_header *header); + +void +box_process_subscribe(struct xrow_header *header); + /** * Check Lua configuration before initialization or * in case of a configuration change. @@ -112,6 +122,8 @@ void box_set_io_collect_interval(double interval); void box_set_snap_io_rate_limit(double limit); void box_set_too_long_threshold(double threshold); +extern struct recovery_state *recovery; + #if defined(__cplusplus) } #endif /* defined(__cplusplus) */ diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 512d11f0b43a1f5b9de7661b1d94edd88b5b7de8..0c2470153e45f8f6d8e5cf9a6cac2d79ad3e5974 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -42,7 +42,6 @@ #include "scoped_guard.h" #include "memory.h" #include "msgpuck/msgpuck.h" -#include "replication.h" #include "session.h" #include "third_party/base64.h" #include "coio.h" @@ -710,12 +709,12 @@ iproto_process_admin(struct iproto_request *ireq) ireq->header.sync); break; case IPROTO_JOIN: - replication_join(con->input.fd, &ireq->header); + box_process_join(&ireq->header); /* TODO: check requests in `con; queue */ iproto_connection_shutdown(con); return; case IPROTO_SUBSCRIBE: - replication_subscribe(con->input.fd, &ireq->header); + box_process_subscribe(&ireq->header); /* TODO: check requests in `con; queue */ iproto_connection_shutdown(con); return; diff --git a/src/box/recovery.cc b/src/box/recovery.cc index e782e96bf8cd11a01a5713ca83a1f1e8be48ddfb..e39be264d3d208425eacaa356b4b01ce4263a7cc 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -111,8 +111,6 @@ * R -> S # snapshot() */ -struct recovery_state *recovery; - const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; /* {{{ LSN API */ @@ -152,15 +150,14 @@ wal_writer_stop(struct recovery_state *r); static void recovery_stop_local(struct recovery_state *r); -void -recovery_init(const char *snap_dirname, const char *wal_dirname, - row_handler row_handler, void *row_handler_param, - snapshot_handler snapshot_handler, join_handler join_handler, - int rows_per_wal) +struct recovery_state * +recovery_new(const char *snap_dirname, const char *wal_dirname, + row_handler row_handler, void *row_handler_param, + snapshot_handler snapshot_handler, + int rows_per_wal) { - assert(recovery == NULL); - recovery = (struct recovery_state *) calloc(1, sizeof(struct recovery_state)); - struct recovery_state *r = recovery; + struct recovery_state *r = (struct recovery_state *) + calloc(1, sizeof(*r)); recovery_update_mode(r, WAL_NONE); assert(rows_per_wal > 1); @@ -170,7 +167,6 @@ recovery_init(const char *snap_dirname, const char *wal_dirname, r->signature = -1; r->snapshot_handler = snapshot_handler; - r->join_handler = join_handler; log_dir_create(&r->snap_dir, snap_dirname, SNAP); @@ -187,6 +183,8 @@ recovery_init(const char *snap_dirname, const char *wal_dirname, panic("can't scan snapshot directory"); if (log_dir_scan(&r->wal_dir) != 0) panic("can't scan WAL directory"); + + return r; } void @@ -205,12 +203,8 @@ recovery_update_io_rate_limit(struct recovery_state *r, double new_limit) } void -recovery_free() +recovery_delete(struct recovery_state *r) { - struct recovery_state *r = recovery; - if (r == NULL) - return; - if (r->watcher) recovery_stop_local(r); @@ -226,7 +220,6 @@ recovery_free() */ log_io_close(&r->current_wal); } - recovery= NULL; } void @@ -247,7 +240,7 @@ recovery_process(struct recovery_state *r, struct xrow_header *row) return; } - return r->row_handler(r->row_handler_param, row); + return r->row_handler(r, r->row_handler_param, row); } void @@ -683,8 +676,6 @@ struct wal_writer struct vclock vclock; }; -static pthread_once_t wal_writer_once = PTHREAD_ONCE_INIT; - static struct wal_writer wal_writer; /** @@ -692,31 +683,22 @@ static struct wal_writer wal_writer; * fork the master process to save a snapshot, and in the child * the WAL writer thread is not necessary and not present. */ -static void -wal_writer_child() +void +recovery_atfork(struct recovery_state *r) { - log_io_atfork(&recovery->current_wal); - if (wal_writer.batch) { - free(wal_writer.batch); - wal_writer.batch = NULL; + log_io_atfork(&r->current_wal); + if (r->writer == NULL) + return; + if (r->writer->batch) { + free(r->writer->batch); + r->writer->batch = NULL; } /* * Make sure that atexit() handlers in the child do * not try to stop the non-existent thread. * The writer is not used in the child. */ - recovery->writer = NULL; -} - -/** - * Today a WAL writer is started once at start of the - * server. Nevertheless, use pthread_once() to make - * sure we can start/stop the writer many times. - */ -static void -wal_writer_init_once() -{ - (void) tt_pthread_atfork(NULL, NULL, wal_writer_child); + r->writer = NULL; } /** @@ -802,8 +784,6 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock) writer->write_event.data = writer; writer->txn_loop = loop(); - (void) tt_pthread_once(&wal_writer_once, wal_writer_init_once); - writer->batch = fio_batch_alloc(sysconf(_SC_IOV_MAX)); if (writer->batch == NULL) @@ -1118,13 +1098,13 @@ wal_write(struct recovery_state *r, struct xrow_header *row) /* {{{ box.snapshot() */ void -snapshot_write_row(struct log_io *l, struct xrow_header *row) +snapshot_write_row(struct recovery_state *r, struct log_io *l, + struct xrow_header *row) { static uint64_t bytes; ev_tstamp elapsed; static ev_tstamp last = 0; ev_loop *loop = loop(); - struct recovery_state *r = recovery; row->tm = last; row->server_id = 0; diff --git a/src/box/recovery.h b/src/box/recovery.h index 60fff69fa0e90325a6225b310dac179d52a17d9e..e982d8291bcc8b1801cdac8e48a8cb4bb2a32f79 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -44,10 +44,11 @@ extern "C" { struct fiber; struct tbuf; +struct recovery_state; -typedef void (row_handler)(void *, struct xrow_header *packet); +typedef void (row_handler)(struct recovery_state *, void *, + struct xrow_header *packet); typedef void (snapshot_handler)(struct log_io *); -typedef void (join_handler)(const struct tt_uuid *node_uuid); /** A "condition variable" that allows fibers to wait when a given * LSN makes it to disk. @@ -81,7 +82,6 @@ struct recovery_state { row_handler *row_handler; void *row_handler_param; snapshot_handler *snapshot_handler; - join_handler *join_handler; uint64_t snap_io_rate_limit; int rows_per_wal; enum wal_mode wal_mode; @@ -91,16 +91,21 @@ struct recovery_state { bool finalize; }; -extern struct recovery_state *recovery; +struct recovery_state * +recovery_new(const char *snap_dirname, const char *wal_dirname, + row_handler row_handler, void *row_handler_param, + snapshot_handler snapshot_handler, + int rows_per_wal); + +void +recovery_delete(struct recovery_state *r); + +void +recovery_atfork(struct recovery_state *r); -void recovery_init(const char *snap_dirname, const char *xlog_dirname, - row_handler row_handler, void *row_handler_param, - snapshot_handler snapshot_handler, join_handler join_handler, - int rows_per_wal); void recovery_update_mode(struct recovery_state *r, enum wal_mode mode); void recovery_update_io_rate_limit(struct recovery_state *r, double new_limit); -void recovery_free(); static inline bool recovery_has_data(struct recovery_state *r) @@ -122,7 +127,8 @@ void recovery_process(struct recovery_state *r, struct xrow_header *packet); struct fio_batch; void -snapshot_write_row(struct log_io *l, struct xrow_header *packet); +snapshot_write_row(struct recovery_state *r, struct log_io *l, + struct xrow_header *packet); void snapshot_save(struct recovery_state *r); #if defined(__cplusplus) diff --git a/src/box/replication.cc b/src/box/replication.cc index 295c1335149688d456629ef3417225588103a4ef..d37940c43ed7b9a44cbf5390bb7ce5dd0726a827 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -30,7 +30,6 @@ #include <say.h> #include <fiber.h> #include <stddef.h> - #include <stddef.h> #include <sys/types.h> #include <sys/socket.h> @@ -215,13 +214,6 @@ struct replication_request { void replication_join(int fd, struct xrow_header *packet) { - assert(packet->type == IPROTO_JOIN); - struct tt_uuid server_uuid = uuid_nil; - xrow_decode_join(packet, &server_uuid); - - /* Notify box about new cluster server */ - recovery->join_handler(&server_uuid); - struct replication_request *request = (struct replication_request *) malloc(sizeof(*request)); if (request == NULL) { @@ -636,10 +628,10 @@ replication_relay_recv(ev_loop * /* loop */, struct ev_io *w, int __attribute__( /** Send a single row to the client. */ static void -replication_relay_send_row(void * /* param */, struct xrow_header *packet) +replication_relay_send_row(struct recovery_state *r, void * /* param */, + struct xrow_header *packet) { assert(iproto_type_is_dml(packet->type)); - struct recovery_state *r = recovery; /* Don't duplicate data */ if (packet->server_id == 0 || packet->server_id != r->server_id) { @@ -749,24 +741,25 @@ replication_relay_loop() sio_setfl(relay.sock, O_NONBLOCK, 0); /* Initialize the recovery process */ - recovery_init(cfg_snap_dir, cfg_wal_dir, + struct recovery_state *r = recovery_new(cfg_snap_dir, cfg_wal_dir, replication_relay_send_row, - NULL, NULL, NULL, INT32_MAX); - + NULL, NULL, INT32_MAX); + int rc = EXIT_SUCCESS; try { switch (relay.type) { case IPROTO_JOIN: - replication_relay_join(recovery); + replication_relay_join(r); break; case IPROTO_SUBSCRIBE: - replication_relay_subscribe(recovery); + replication_relay_subscribe(r); break; default: assert(false); } } catch (Exception *e) { say_error("relay error: %s", e->errmsg()); - exit(EXIT_FAILURE); + rc = EXIT_FAILURE; } - exit(EXIT_SUCCESS); + recovery_delete(r); + exit(rc); } diff --git a/src/box/txn.cc b/src/box/txn.cc index 180980e2c37d06314ec7ef16dd4718009128a2d5..cd74fae9b773287a5c2d11ac50498f4c510d14b9 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -27,6 +27,7 @@ * SUCH DAMAGE. */ #include "txn.h" +#include "box.h" #include "tuple.h" #include "space.h" #include <tarantool.h> diff --git a/src/tarantool.cc b/src/tarantool.cc index 1c8873f79583788d267f9099955bb5537c392ca4..5a26d8436f80a2811b4316db60becff69050140b 100644 --- a/src/tarantool.cc +++ b/src/tarantool.cc @@ -265,6 +265,12 @@ signal_reset() say_syserror("sigprocmask"); } +static void +tarantool_atfork() +{ + signal_reset(); + box_atfork(); +} /** * Adjust the process signal mask and add handlers for signals. @@ -303,7 +309,7 @@ signal_init(void) for (int i = 0; i < ev_sig_count; i++) ev_signal_start(loop(), &ev_sigs[i]); - (void) tt_pthread_atfork(NULL, NULL, signal_reset); + (void) tt_pthread_atfork(NULL, NULL, tarantool_atfork); } static void