Skip to content
Snippets Groups Projects
Commit a6578c62 authored by Yuriy Vostrikov's avatar Yuriy Vostrikov
Browse files

[core] feeder stream versioning, refactored remote recovery

parent 19e9589b
No related branches found
No related tags found
No related merge requests found
......@@ -48,6 +48,7 @@
#include <tbuf.h>
const u16 default_tag = 0;
const u32 default_version = 11;
const u32 snap_marker_v04 = -1U;
const u64 xlog_marker_v04 = -1ULL;
const u64 xlog_eof_marker_v04 = 0;
......
......@@ -38,6 +38,13 @@
#include <log_io.h>
#include "log_io_internal.h"
struct remote_state {
struct recovery_state *r;
int (*handler)(struct recovery_state *r, struct tbuf *row);
};
static struct tbuf *
row_reader_v11(struct palloc_pool *pool)
{
......@@ -61,76 +68,119 @@ row_reader_v11(struct palloc_pool *pool)
return m;
}
static void
pull_from_remote(void *state)
static struct tbuf *
remote_read_row(i64 initial_lsn)
{
struct recovery_state *r = state;
struct tbuf *row;
i64 lsn;
int rows = 0;
bool warning_said = false;
const int reconnect_delay = 1;
const char *err = NULL;
u32 version;
for (;;) {
if (fiber->fd < 0) {
if (fiber_connect(fiber->data) < 0) {
if (!warning_said) {
say_syserror("can't connect to feeder");
say_info("will retry every %i second", reconnect_delay);
warning_said = true;
}
fiber_sleep(reconnect_delay);
continue;
err = "can't connect to feeder";
goto err;
}
if (fiber_write(&initial_lsn, sizeof(initial_lsn)) != sizeof(initial_lsn)) {
err = "can't write version";
goto err;
}
say_crit("succefully connected to feeder");
lsn = confirmed_lsn(r) + 1;
fiber_write(&lsn, sizeof(lsn));
if (fiber_read(&version, sizeof(version)) != sizeof(version)) {
err = "can't read version";
goto err;
}
if (version != default_version) {
err = "remote version mismatch";
goto err;
}
say_crit("starting remote recovery from lsn:%" PRIi64, lsn);
say_crit("succefully connected to feeder");
say_crit("starting remote recovery from lsn:%" PRIi64, initial_lsn);
warning_said = false;
err = NULL;
}
row = row_reader_v11(fiber->pool);
if (row == NULL) {
fiber_close();
fiber_sleep(reconnect_delay);
continue;
err = "can't read row";
goto err;
}
r->recovery_lag = ev_now() - row_v11(row)->tm;
i64 lsn = row_v11(row)->lsn;
struct tbuf *data = tbuf_alloc(row->pool);
tbuf_append(data, row_v11(row)->data, row_v11(row)->len);
return row;
err:
if (err != NULL && !warning_said) {
say_info("%s", err);
say_info("will retry every %i second", reconnect_delay);
warning_said = true;
}
fiber_sleep(reconnect_delay);
}
}
if (r->wal_row_handler(r, row) < 0)
panic("replication failure: can't apply row");
static void
pull_from_remote(void *state)
{
struct remote_state *h = state;
struct tbuf *row;
if (wal_write(r, lsn, data) == false)
panic("replication failure: can't write row to WAL");
for (;;) {
row = remote_read_row(confirmed_lsn(h->r) + 1);
h->r->recovery_lag = ev_now() - row_v11(row)->tm;
next_lsn(r, lsn);
confirm_lsn(r, lsn);
if (h->handler(h->r, row) < 0)
continue;
if (rows++ % 1000 == 0) {
prelease(fiber->pool);
rows = 0;
}
prelease_after(fiber->pool, 128 * 1024);
}
}
int
default_remote_row_handler(struct recovery_state *r, struct tbuf *row)
{
struct tbuf *data;
i64 lsn = row_v11(row)->lsn;
/* save row data since wal_row_handler may clobber it */
data = tbuf_alloc(row->pool);
tbuf_append(data, row_v11(row)->data, row_v11(row)->len);
if (r->wal_row_handler(r, row) < 0)
panic("replication failure: can't apply row");
if (wal_write(r, lsn, data) == false)
panic("replication failure: can't write row to WAL");
next_lsn(r, lsn);
confirm_lsn(r, lsn);
return 0;
}
struct fiber *
recover_follow_remote(struct recovery_state *r, char *ip_addr, int port)
recover_follow_remote(struct recovery_state *r, char *ip_addr, int port, int (*handler)(struct recovery_state *r, struct tbuf *row))
{
char *name;
struct fiber *f;
struct in_addr server;
struct sockaddr_in *addr;
struct remote_state *h;
say_crit("initializing remote hot standby, WAL feeder %s:%i", ip_addr, port);
name = palloc(eter_pool, 64);
snprintf(name, 64, "remote_hot_standby/%s:%i", ip_addr, port);
f = fiber_create(name, -1, -1, pull_from_remote, r);
h = palloc(eter_pool, sizeof(*h));
h->r = r;
h->handler = handler;
f = fiber_create(name, -1, -1, pull_from_remote, h);
if (f == NULL)
return NULL;
......
......@@ -38,6 +38,7 @@
#define RECOVER_READONLY 1
extern const u16 default_tag;
extern const u32 default_version;
struct log_io;
struct recovery_state;
......@@ -91,7 +92,11 @@ struct child *wal_writer(struct recovery_state *r);
int read_log(const char *filename, row_reader reader,
row_handler xlog_handler, row_handler snap_handler, void *state);
struct fiber *recover_follow_remote(struct recovery_state *r, char *ip_addr, int port);
int default_remote_row_handler(struct recovery_state *r, struct tbuf *row);
struct fiber *recover_follow_remote(struct recovery_state *r, char *ip_addr, int port,
int (*handler)(struct recovery_state *r, struct tbuf *row));
struct log_io_iter;
void snapshot_write_row(struct log_io_iter *i, struct tbuf *row);
......
......@@ -57,6 +57,7 @@ static void
recover_feed_slave(int sock)
{
struct recovery_state *log_io;
struct tbuf *ver;
i64 lsn;
ssize_t r;
......@@ -74,6 +75,10 @@ recover_feed_slave(int sock)
exit(EXIT_SUCCESS);
}
ver = tbuf_alloc(fiber->pool);
tbuf_append(ver, &default_version, sizeof(default_version));
send_row(NULL, ver);
log_io = recover_init(NULL, cfg.wal_feeder_dir,
NULL, NULL, send_row, 0, 0, 0, 64, RECOVER_READONLY, false);
......
......@@ -1601,7 +1601,7 @@ box_bound_to_primary(void *data __unused__)
status = palloc(eter_pool, 64);
snprintf(status, 64, "hot_standby/%s:%i%s", cfg.wal_feeder_ipaddr,
cfg.wal_feeder_port, custom_proc_title);
recover_follow_remote(recovery_state, cfg.wal_feeder_ipaddr, cfg.wal_feeder_port);
recover_follow_remote(recovery_state, cfg.wal_feeder_ipaddr, cfg.wal_feeder_port, default_remote_row_handler);
title("hot_standby/%s:%i", cfg.wal_feeder_ipaddr, cfg.wal_feeder_port);
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment