From d52976cd87c1221ba4ccbf35cdfc6e3deba66255 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Fri, 8 Jun 2012 10:14:50 +0400 Subject: [PATCH] Extract recovery members pertinent to replica state into a separate structure. Extract struct recovery_state members which are used only when replication is on to struct remote. --- include/recovery.h | 32 ++++++------ include/say.h | 1 + mod/box/box.m | 12 +---- src/admin.m | 118 +++++++++++++++++++++++------------------- src/admin.rl | 12 +++++ src/recovery.m | 1 - src/replica.m | 37 ++++++------- test/box/admin.result | 2 - 8 files changed, 113 insertions(+), 102 deletions(-) diff --git a/include/recovery.h b/include/recovery.h index ab44e5ae69..a9741d27a7 100644 --- a/include/recovery.h +++ b/include/recovery.h @@ -63,37 +63,35 @@ wait_lsn_clear(struct wait_lsn *wait_lsn) struct wal_writer; struct wal_watcher; +/** Master connection */ +struct remote { + struct sockaddr_in addr; + struct fiber *reader; + u64 cookie; + ev_tstamp recovery_lag, recovery_last_update_tstamp; +}; + struct recovery_state { i64 lsn, confirmed_lsn; /* The WAL we're currently reading/writing from/to. */ struct log_io *current_wal; - /* - * When opening the next WAL, we want to first open - * a new file before closing the previous one. Thus - * we save the old WAL here. - */ struct log_dir *snap_dir; struct log_dir *wal_dir; struct wal_writer *writer; struct wal_watcher *watcher; - struct fiber *remote_recovery; - + struct remote *remote; /** - * Row_handler is invoked during initial recovery. - * It will be presented with the most recent format of - * data. Row_reader is responsible for converting data - * from old formats. + * row_handler is a module callback invoked during initial + * recovery and when reading rows from the master. It is + * presented with the most recent format of data. + * row_reader is responsible for converting data from old + * formats. */ row_handler *row_handler; - struct sockaddr_in remote_addr; - - ev_tstamp recovery_lag, recovery_last_update_tstamp; - int snap_io_rate_limit; int rows_per_wal; int flags; double wal_fsync_delay; - u64 cookie; struct wait_lsn wait_lsn; bool finalize; @@ -124,7 +122,7 @@ void recovery_wait_lsn(struct recovery_state *r, i64 lsn); int read_log(const char *filename, row_handler xlog_handler, row_handler snap_handler); -void recovery_follow_remote(struct recovery_state *r, const char *remote); +void recovery_follow_remote(struct recovery_state *r, const char *addr); void recovery_stop_remote(struct recovery_state *r); struct nbatch; diff --git a/include/say.h b/include/say.h index 748d01d73f..fdb9b1a387 100644 --- a/include/say.h +++ b/include/say.h @@ -42,6 +42,7 @@ enum say_level { }; extern int sayfd; +extern pid_t logger_pid; void say_logger_init(int nonblock); void vsay(int level, const char *filename, int line, const char *error, diff --git a/mod/box/box.m b/mod/box/box.m index 5eeca0479b..adaca3f825 100644 --- a/mod/box/box.m +++ b/mod/box/box.m @@ -51,8 +51,6 @@ static void box_process_rw(struct txn *txn, Port *port, u32 op, struct tbuf *request_data); box_process_func box_process = box_process_ro; -extern pid_t logger_pid; - const char *mod_name = "Box"; static char status[64] = "unknown"; @@ -436,7 +434,7 @@ mod_reload_config(struct tarantool_cfg *old_conf, struct tarantool_cfg *new_conf if (!old_is_replica && new_is_replica) memcached_stop_expire(); - if (recovery_state->remote_recovery) + if (recovery_state->remote) recovery_stop_remote(recovery_state); box_enter_master_or_replica_mode(new_conf); @@ -565,14 +563,6 @@ mod_snapshot(struct log_io *l, struct nbatch *batch) void mod_info(struct tbuf *out) { - tbuf_printf(out, " version: \"%s\"" CRLF, tarantool_version()); - tbuf_printf(out, " uptime: %i" CRLF, (int)tarantool_uptime()); - tbuf_printf(out, " pid: %i" CRLF, getpid()); - tbuf_printf(out, " logger_pid: %i" CRLF, logger_pid); - tbuf_printf(out, " lsn: %" PRIi64 CRLF, recovery_state->confirmed_lsn); - tbuf_printf(out, " recovery_lag: %.3f" CRLF, recovery_state->recovery_lag); - tbuf_printf(out, " recovery_last_update: %.3f" CRLF, - recovery_state->recovery_last_update_tstamp); tbuf_printf(out, " status: %s" CRLF, status); } diff --git a/src/admin.m b/src/admin.m index bad711f261..f1a2e3b13f 100644 --- a/src/admin.m +++ b/src/admin.m @@ -39,6 +39,7 @@ #include <stat.h> #include <tarantool.h> #include <tarantool_lua.h> +#include <recovery.h> #include TARANTOOL_CONFIG #include <tbuf.h> #include <util.h> @@ -68,7 +69,7 @@ static const char *help = static const char *unknown_command = "unknown command. try typing help." CRLF; -#line 72 "src/admin.m" +#line 73 "src/admin.m" static const int admin_start = 1; static const int admin_first_final = 135; static const int admin_error = 0; @@ -76,7 +77,7 @@ static const int admin_error = 0; static const int admin_en_main = 1; -#line 71 "src/admin.rl" +#line 72 "src/admin.rl" @@ -112,6 +113,17 @@ static void tarantool_info(struct tbuf *out) { tbuf_printf(out, "info:" CRLF); + tbuf_printf(out, " version: \"%s\"" CRLF, tarantool_version()); + tbuf_printf(out, " uptime: %i" CRLF, (int)tarantool_uptime()); + tbuf_printf(out, " pid: %i" CRLF, getpid()); + tbuf_printf(out, " logger_pid: %i" CRLF, logger_pid); + tbuf_printf(out, " lsn: %" PRIi64 CRLF, recovery_state->confirmed_lsn); + if (recovery_state->remote) { + tbuf_printf(out, " recovery_lag: %.3f" CRLF, + recovery_state->remote->recovery_lag); + tbuf_printf(out, " recovery_last_update: %.3f" CRLF, + recovery_state->remote->recovery_last_update_tstamp); + } mod_info(out); const char *path = cfg_filename_fullpath; if (path == NULL) @@ -138,12 +150,12 @@ admin_dispatch(lua_State *L) p = fiber->rbuf->data; -#line 142 "src/admin.m" +#line 154 "src/admin.m" { cs = admin_start; } -#line 147 "src/admin.m" +#line 159 "src/admin.m" { if ( p == pe ) goto _test_eof; @@ -206,15 +218,15 @@ case 6: } goto st0; tr13: -#line 241 "src/admin.rl" +#line 253 "src/admin.rl" {slab_validate(); ok(out);} goto st135; tr20: -#line 229 "src/admin.rl" +#line 241 "src/admin.rl" {return 0;} goto st135; tr25: -#line 156 "src/admin.rl" +#line 168 "src/admin.rl" { start(out); tbuf_append(out, help, strlen(help)); @@ -222,9 +234,9 @@ tr25: } goto st135; tr36: -#line 215 "src/admin.rl" +#line 227 "src/admin.rl" {strend = p;} -#line 162 "src/admin.rl" +#line 174 "src/admin.rl" { strstart[strend-strstart]='\0'; start(out); @@ -233,7 +245,7 @@ tr36: } goto st135; tr43: -#line 169 "src/admin.rl" +#line 181 "src/admin.rl" { if (reload_cfg(err)) fail(out, err); @@ -242,11 +254,11 @@ tr43: } goto st135; tr67: -#line 239 "src/admin.rl" +#line 251 "src/admin.rl" {coredump(60); ok(out);} goto st135; tr76: -#line 176 "src/admin.rl" +#line 188 "src/admin.rl" { int ret = snapshot(NULL, 0); @@ -261,9 +273,9 @@ tr76: } goto st135; tr98: -#line 225 "src/admin.rl" +#line 237 "src/admin.rl" { state = false; } -#line 189 "src/admin.rl" +#line 201 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -275,9 +287,9 @@ tr98: } goto st135; tr101: -#line 224 "src/admin.rl" +#line 236 "src/admin.rl" { state = true; } -#line 189 "src/admin.rl" +#line 201 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -289,7 +301,7 @@ tr101: } goto st135; tr117: -#line 132 "src/admin.rl" +#line 144 "src/admin.rl" { tarantool_cfg_iterator_t *i; char *key, *value; @@ -309,15 +321,15 @@ tr117: } goto st135; tr131: -#line 232 "src/admin.rl" +#line 244 "src/admin.rl" {start(out); fiber_info(out); end(out);} goto st135; tr137: -#line 231 "src/admin.rl" +#line 243 "src/admin.rl" {start(out); tarantool_info(out); end(out);} goto st135; tr146: -#line 150 "src/admin.rl" +#line 162 "src/admin.rl" { start(out); errinj_info(out); @@ -325,33 +337,33 @@ tr146: } goto st135; tr152: -#line 235 "src/admin.rl" +#line 247 "src/admin.rl" {start(out); palloc_stat(out); end(out);} goto st135; tr160: -#line 234 "src/admin.rl" +#line 246 "src/admin.rl" {start(out); slab_stat(out); end(out);} goto st135; tr164: -#line 236 "src/admin.rl" +#line 248 "src/admin.rl" {start(out); stat_print(out);end(out);} goto st135; st135: if ( ++p == pe ) goto _test_eof135; case 135: -#line 344 "src/admin.m" +#line 356 "src/admin.m" goto st0; tr14: -#line 241 "src/admin.rl" +#line 253 "src/admin.rl" {slab_validate(); ok(out);} goto st7; tr21: -#line 229 "src/admin.rl" +#line 241 "src/admin.rl" {return 0;} goto st7; tr26: -#line 156 "src/admin.rl" +#line 168 "src/admin.rl" { start(out); tbuf_append(out, help, strlen(help)); @@ -359,9 +371,9 @@ tr26: } goto st7; tr37: -#line 215 "src/admin.rl" +#line 227 "src/admin.rl" {strend = p;} -#line 162 "src/admin.rl" +#line 174 "src/admin.rl" { strstart[strend-strstart]='\0'; start(out); @@ -370,7 +382,7 @@ tr37: } goto st7; tr44: -#line 169 "src/admin.rl" +#line 181 "src/admin.rl" { if (reload_cfg(err)) fail(out, err); @@ -379,11 +391,11 @@ tr44: } goto st7; tr68: -#line 239 "src/admin.rl" +#line 251 "src/admin.rl" {coredump(60); ok(out);} goto st7; tr77: -#line 176 "src/admin.rl" +#line 188 "src/admin.rl" { int ret = snapshot(NULL, 0); @@ -398,9 +410,9 @@ tr77: } goto st7; tr99: -#line 225 "src/admin.rl" +#line 237 "src/admin.rl" { state = false; } -#line 189 "src/admin.rl" +#line 201 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -412,9 +424,9 @@ tr99: } goto st7; tr102: -#line 224 "src/admin.rl" +#line 236 "src/admin.rl" { state = true; } -#line 189 "src/admin.rl" +#line 201 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -426,7 +438,7 @@ tr102: } goto st7; tr118: -#line 132 "src/admin.rl" +#line 144 "src/admin.rl" { tarantool_cfg_iterator_t *i; char *key, *value; @@ -446,15 +458,15 @@ tr118: } goto st7; tr132: -#line 232 "src/admin.rl" +#line 244 "src/admin.rl" {start(out); fiber_info(out); end(out);} goto st7; tr138: -#line 231 "src/admin.rl" +#line 243 "src/admin.rl" {start(out); tarantool_info(out); end(out);} goto st7; tr147: -#line 150 "src/admin.rl" +#line 162 "src/admin.rl" { start(out); errinj_info(out); @@ -462,22 +474,22 @@ tr147: } goto st7; tr153: -#line 235 "src/admin.rl" +#line 247 "src/admin.rl" {start(out); palloc_stat(out); end(out);} goto st7; tr161: -#line 234 "src/admin.rl" +#line 246 "src/admin.rl" {start(out); slab_stat(out); end(out);} goto st7; tr165: -#line 236 "src/admin.rl" +#line 248 "src/admin.rl" {start(out); stat_print(out);end(out);} goto st7; st7: if ( ++p == pe ) goto _test_eof7; case 7: -#line 481 "src/admin.m" +#line 493 "src/admin.m" if ( (*p) == 10 ) goto st135; goto st0; @@ -630,28 +642,28 @@ case 23: } goto tr33; tr33: -#line 215 "src/admin.rl" +#line 227 "src/admin.rl" {strstart = p;} goto st24; st24: if ( ++p == pe ) goto _test_eof24; case 24: -#line 641 "src/admin.m" +#line 653 "src/admin.m" switch( (*p) ) { case 10: goto tr36; case 13: goto tr37; } goto st24; tr34: -#line 215 "src/admin.rl" +#line 227 "src/admin.rl" {strstart = p;} goto st25; st25: if ( ++p == pe ) goto _test_eof25; case 25: -#line 655 "src/admin.m" +#line 667 "src/admin.m" switch( (*p) ) { case 10: goto tr36; case 13: goto tr37; @@ -1101,28 +1113,28 @@ case 73: goto tr91; goto st0; tr91: -#line 223 "src/admin.rl" +#line 235 "src/admin.rl" { strstart = p; } goto st74; st74: if ( ++p == pe ) goto _test_eof74; case 74: -#line 1112 "src/admin.m" +#line 1124 "src/admin.m" if ( (*p) == 32 ) goto tr92; if ( 33 <= (*p) && (*p) <= 126 ) goto st74; goto st0; tr92: -#line 223 "src/admin.rl" +#line 235 "src/admin.rl" { strend = p; } goto st75; st75: if ( ++p == pe ) goto _test_eof75; case 75: -#line 1126 "src/admin.m" +#line 1138 "src/admin.m" switch( (*p) ) { case 32: goto st75; case 111: goto st76; @@ -1814,7 +1826,7 @@ case 134: _out: {} } -#line 247 "src/admin.rl" +#line 259 "src/admin.rl" tbuf_ltrim(fiber->rbuf, (void *)pe - (void *)fiber->rbuf->data); diff --git a/src/admin.rl b/src/admin.rl index f79cb42291..18a82bc25e 100644 --- a/src/admin.rl +++ b/src/admin.rl @@ -37,6 +37,7 @@ #include <stat.h> #include <tarantool.h> #include <tarantool_lua.h> +#include <recovery.h> #include TARANTOOL_CONFIG #include <tbuf.h> #include <util.h> @@ -103,6 +104,17 @@ static void tarantool_info(struct tbuf *out) { tbuf_printf(out, "info:" CRLF); + tbuf_printf(out, " version: \"%s\"" CRLF, tarantool_version()); + tbuf_printf(out, " uptime: %i" CRLF, (int)tarantool_uptime()); + tbuf_printf(out, " pid: %i" CRLF, getpid()); + tbuf_printf(out, " logger_pid: %i" CRLF, logger_pid); + tbuf_printf(out, " lsn: %" PRIi64 CRLF, recovery_state->confirmed_lsn); + if (recovery_state->remote) { + tbuf_printf(out, " recovery_lag: %.3f" CRLF, + recovery_state->remote->recovery_lag); + tbuf_printf(out, " recovery_last_update: %.3f" CRLF, + recovery_state->remote->recovery_last_update_tstamp); + } mod_info(out); const char *path = cfg_filename_fullpath; if (path == NULL) diff --git a/src/recovery.m b/src/recovery.m index 8505310a8b..61af76acfd 100644 --- a/src/recovery.m +++ b/src/recovery.m @@ -1229,7 +1229,6 @@ read_log(const char *filename, return 0; } - /* }}} */ /* diff --git a/src/replica.m b/src/replica.m index 40e74b159e..44ef008940 100644 --- a/src/replica.m +++ b/src/replica.m @@ -122,11 +122,11 @@ pull_from_remote(void *state) for (;;) { fiber_setcancelstate(true); - row = remote_read_row(&r->remote_addr, r->confirmed_lsn + 1); + row = remote_read_row(&r->remote->addr, r->confirmed_lsn + 1); fiber_setcancelstate(false); - r->recovery_lag = ev_now() - header_v11(row)->tm; - r->recovery_last_update_tstamp = ev_now(); + r->remote->recovery_lag = ev_now() - header_v11(row)->tm; + r->remote->recovery_last_update_tstamp = ev_now(); if (remote_apply_row(r, row) < 0) { fiber_close(); @@ -159,7 +159,7 @@ remote_apply_row(struct recovery_state *r, struct tbuf *row) assert(tag == XLOG); - if (wal_write(r, lsn, r->cookie, op, data)) + if (wal_write(r, lsn, r->remote->cookie, op, data)) panic("replication failure: can't write row to WAL"); next_lsn(r, lsn); @@ -169,7 +169,7 @@ remote_apply_row(struct recovery_state *r, struct tbuf *row) } void -recovery_follow_remote(struct recovery_state *r, const char *remote) +recovery_follow_remote(struct recovery_state *r, const char *addr) { char name[FIBER_NAME_MAXLEN]; char ip_addr[32]; @@ -178,16 +178,16 @@ recovery_follow_remote(struct recovery_state *r, const char *remote) struct fiber *f; struct in_addr server; - assert(r->remote_recovery == NULL); + assert(r->remote == NULL); - say_crit("initializing the replica, WAL master %s", remote); - snprintf(name, sizeof(name), "replica/%s", remote); + say_crit("initializing the replica, WAL master %s", addr); + snprintf(name, sizeof(name), "replica/%s", addr); f = fiber_create(name, -1, pull_from_remote, r); if (f == NULL) return; - rc = sscanf(remote, "%31[^:]:%i", ip_addr, &port); + rc = sscanf(addr, "%31[^:]:%i", ip_addr, &port); assert(rc == 2); (void)rc; @@ -196,20 +196,21 @@ recovery_follow_remote(struct recovery_state *r, const char *remote) return; } - memset(&r->remote_addr, 0, sizeof(r->remote_addr)); - r->remote_addr.sin_family = AF_INET; - memcpy(&r->remote_addr.sin_addr.s_addr, &server, sizeof(server)); - r->remote_addr.sin_port = htons(port); - memcpy(&r->cookie, &r->remote_addr, MIN(sizeof(r->cookie), sizeof(r->remote_addr))); + static struct remote remote; + memset(&remote, 0, sizeof(remote)); + remote.addr.sin_family = AF_INET; + memcpy(&remote.addr.sin_addr.s_addr, &server, sizeof(server)); + remote.addr.sin_port = htons(port); + memcpy(&remote.cookie, &remote.addr, MIN(sizeof(remote.cookie), sizeof(remote.addr))); + remote.reader = f; + r->remote = &remote; fiber_call(f); - r->remote_recovery = f; } void recovery_stop_remote(struct recovery_state *r) { say_info("shutting down the replica"); - fiber_cancel(r->remote_recovery); - r->remote_recovery = NULL; - memset(&r->remote_addr, 0, sizeof(r->remote_addr)); + fiber_cancel(r->remote->reader); + r->remote = NULL; } diff --git a/test/box/admin.result b/test/box/admin.result index 894f447316..6ffde66757 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -101,8 +101,6 @@ info: pid: <pid> logger_pid: <pid> lsn: 3 - recovery_lag: 0.000 - recovery_last_update: 0.000 status: primary config: "tarantool.cfg" ... -- GitLab