diff --git a/core/log_io.c b/core/log_io.c index 55a0ba46d706558809bbe6359386893e291bcff0..9c8d669788256b067345fac07f9f5045f1390d7a 100644 --- a/core/log_io.c +++ b/core/log_io.c @@ -48,6 +48,7 @@ #include <tbuf.h> const u16 default_tag = 0; +const u32 default_version = 11; const u32 snap_marker_v04 = -1U; const u64 xlog_marker_v04 = -1ULL; const u64 xlog_eof_marker_v04 = 0; diff --git a/core/log_io_remote.c b/core/log_io_remote.c index 9d28fd456175904f965b4457ce45766abb857065..4a47d6787d6914ce25ff7187559c66e5475b658a 100644 --- a/core/log_io_remote.c +++ b/core/log_io_remote.c @@ -38,6 +38,13 @@ #include <log_io.h> #include "log_io_internal.h" + +struct remote_state { + struct recovery_state *r; + int (*handler)(struct recovery_state *r, struct tbuf *row); +}; + + static struct tbuf * row_reader_v11(struct palloc_pool *pool) { @@ -61,76 +68,119 @@ row_reader_v11(struct palloc_pool *pool) return m; } -static void -pull_from_remote(void *state) +static struct tbuf * +remote_read_row(i64 initial_lsn) { - struct recovery_state *r = state; struct tbuf *row; - i64 lsn; - int rows = 0; bool warning_said = false; const int reconnect_delay = 1; + const char *err = NULL; + u32 version; for (;;) { if (fiber->fd < 0) { if (fiber_connect(fiber->data) < 0) { - if (!warning_said) { - say_syserror("can't connect to feeder"); - say_info("will retry every %i second", reconnect_delay); - warning_said = true; - } - fiber_sleep(reconnect_delay); - continue; + err = "can't connect to feeder"; + goto err; + } + + if (fiber_write(&initial_lsn, sizeof(initial_lsn)) != sizeof(initial_lsn)) { + err = "can't write version"; + goto err; } - say_crit("succefully connected to feeder"); - lsn = confirmed_lsn(r) + 1; - fiber_write(&lsn, sizeof(lsn)); + if (fiber_read(&version, sizeof(version)) != sizeof(version)) { + err = "can't read version"; + goto err; + } + + if (version != default_version) { + err = "remote version mismatch"; + goto err; + } - say_crit("starting remote recovery from lsn:%" PRIi64, lsn); + say_crit("succefully connected to feeder"); + say_crit("starting remote recovery from lsn:%" PRIi64, initial_lsn); warning_said = false; + err = NULL; } row = row_reader_v11(fiber->pool); if (row == NULL) { - fiber_close(); - fiber_sleep(reconnect_delay); - continue; + err = "can't read row"; + goto err; } - r->recovery_lag = ev_now() - row_v11(row)->tm; - i64 lsn = row_v11(row)->lsn; - struct tbuf *data = tbuf_alloc(row->pool); - tbuf_append(data, row_v11(row)->data, row_v11(row)->len); + return row; + + err: + if (err != NULL && !warning_said) { + say_info("%s", err); + say_info("will retry every %i second", reconnect_delay); + warning_said = true; + } + fiber_sleep(reconnect_delay); + } +} - if (r->wal_row_handler(r, row) < 0) - panic("replication failure: can't apply row"); +static void +pull_from_remote(void *state) +{ + struct remote_state *h = state; + struct tbuf *row; - if (wal_write(r, lsn, data) == false) - panic("replication failure: can't write row to WAL"); + for (;;) { + row = remote_read_row(confirmed_lsn(h->r) + 1); + h->r->recovery_lag = ev_now() - row_v11(row)->tm; - next_lsn(r, lsn); - confirm_lsn(r, lsn); + if (h->handler(h->r, row) < 0) + continue; - if (rows++ % 1000 == 0) { - prelease(fiber->pool); - rows = 0; - } + prelease_after(fiber->pool, 128 * 1024); } } + +int +default_remote_row_handler(struct recovery_state *r, struct tbuf *row) +{ + struct tbuf *data; + i64 lsn = row_v11(row)->lsn; + + /* save row data since wal_row_handler may clobber it */ + data = tbuf_alloc(row->pool); + tbuf_append(data, row_v11(row)->data, row_v11(row)->len); + + if (r->wal_row_handler(r, row) < 0) + panic("replication failure: can't apply row"); + + if (wal_write(r, lsn, data) == false) + panic("replication failure: can't write row to WAL"); + + next_lsn(r, lsn); + confirm_lsn(r, lsn); + + return 0; +} + struct fiber * -recover_follow_remote(struct recovery_state *r, char *ip_addr, int port) +recover_follow_remote(struct recovery_state *r, char *ip_addr, int port, int (*handler)(struct recovery_state *r, struct tbuf *row)) { char *name; struct fiber *f; struct in_addr server; struct sockaddr_in *addr; + struct remote_state *h; say_crit("initializing remote hot standby, WAL feeder %s:%i", ip_addr, port); name = palloc(eter_pool, 64); snprintf(name, 64, "remote_hot_standby/%s:%i", ip_addr, port); - f = fiber_create(name, -1, -1, pull_from_remote, r); + + h = palloc(eter_pool, sizeof(*h)); + h->r = r; + h->handler = handler; + + f = fiber_create(name, -1, -1, pull_from_remote, h); if (f == NULL) return NULL; diff --git a/include/log_io.h b/include/log_io.h index fbaf388b483c200e6596eaad44eacfa2130a7878..e783b94d19d65b0cf7b3da45dfc972cd90d1b9a2 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -38,6 +38,7 @@ #define RECOVER_READONLY 1 extern const u16 default_tag; +extern const u32 default_version; struct log_io; struct recovery_state; @@ -91,7 +92,11 @@ struct child *wal_writer(struct recovery_state *r); int read_log(const char *filename, row_reader reader, row_handler xlog_handler, row_handler snap_handler, void *state); -struct fiber *recover_follow_remote(struct recovery_state *r, char *ip_addr, int port); + +int default_remote_row_handler(struct recovery_state *r, struct tbuf *row); +struct fiber *recover_follow_remote(struct recovery_state *r, char *ip_addr, int port, + int (*handler)(struct recovery_state *r, struct tbuf *row)); + struct log_io_iter; void snapshot_write_row(struct log_io_iter *i, struct tbuf *row); diff --git a/mod/feeder/feeder.c b/mod/feeder/feeder.c index f29adbe9ad1b84eb788ac058f7e60c39eb2ffe54..04ff9f265a87c5f78ba71d3727eba01ebac7dda9 100644 --- a/mod/feeder/feeder.c +++ b/mod/feeder/feeder.c @@ -57,6 +57,7 @@ static void recover_feed_slave(int sock) { struct recovery_state *log_io; + struct tbuf *ver; i64 lsn; ssize_t r; @@ -74,6 +75,10 @@ recover_feed_slave(int sock) exit(EXIT_SUCCESS); } + ver = tbuf_alloc(fiber->pool); + tbuf_append(ver, &default_version, sizeof(default_version)); + send_row(NULL, ver); + log_io = recover_init(NULL, cfg.wal_feeder_dir, NULL, NULL, send_row, 0, 0, 0, 64, RECOVER_READONLY, false); diff --git a/mod/silverbox/box.c b/mod/silverbox/box.c index d8b14104d0e4a3dc1b0fb09d7e4bc1ff334f87ad..2c601f687d90760c03ab216cdf2be855ce20d56a 100644 --- a/mod/silverbox/box.c +++ b/mod/silverbox/box.c @@ -1601,7 +1601,7 @@ box_bound_to_primary(void *data __unused__) status = palloc(eter_pool, 64); snprintf(status, 64, "hot_standby/%s:%i%s", cfg.wal_feeder_ipaddr, cfg.wal_feeder_port, custom_proc_title); - recover_follow_remote(recovery_state, cfg.wal_feeder_ipaddr, cfg.wal_feeder_port); + recover_follow_remote(recovery_state, cfg.wal_feeder_ipaddr, cfg.wal_feeder_port, default_remote_row_handler); title("hot_standby/%s:%i", cfg.wal_feeder_ipaddr, cfg.wal_feeder_port); } else {