From 45efa08290ddae09114c420c1185f9f7d96f6c0d Mon Sep 17 00:00:00 2001 From: Yuriy Vostrikov <vostrikov@corp.mail.ru> Date: Thu, 11 Nov 2010 10:58:45 +0300 Subject: [PATCH] [core, feeder] support for v11 rows over network replication --- core/log_io_internal.h | 1 + core/log_io_remote.c | 25 +++++++++++++++---------- mod/feeder/feeder.c | 2 +- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/core/log_io_internal.h b/core/log_io_internal.h index bc79e1d0ad..056d3389ed 100644 --- a/core/log_io_internal.h +++ b/core/log_io_internal.h @@ -67,6 +67,7 @@ struct recovery_state { log_io_class->reader is responsible of converting data from old format */ row_handler *wal_row_handler, *snap_row_handler; ev_timer wal_timer; + ev_tstamp recovery_lag; int snap_io_rate_limit; diff --git a/core/log_io_remote.c b/core/log_io_remote.c index b31ea7a73a..9d28fd4561 100644 --- a/core/log_io_remote.c +++ b/core/log_io_remote.c @@ -39,23 +39,23 @@ #include "log_io_internal.h" static struct tbuf * -row_reader_v04(struct palloc_pool *pool) +row_reader_v11(struct palloc_pool *pool) { - const int header_size = offsetof(struct row_v04, data); + const int header_size = sizeof(struct row_v11); struct tbuf *m = tbuf_alloc(pool); + tbuf_ensure(m, header_size); if (fiber_read(m->data, header_size) != header_size) { say_error("unexpected eof reading row header"); return NULL; } - m->len = header_size; + tbuf_ensure(m, header_size + row_v11(m)->len); + m->len = header_size + row_v11(m)->len; - tbuf_ensure(m, header_size + row_v04(m)->len); - if (fiber_read(row_v04(m)->data, row_v04(m)->len) != row_v04(m)->len) { + if (fiber_read(row_v11(m)->data, row_v11(m)->len) != row_v11(m)->len) { say_error("unexpected eof reading row body"); return NULL; } - m->len += row_v04(m)->len; say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m)); return m; @@ -91,21 +91,26 @@ pull_from_remote(void *state) warning_said = false; } - row = row_reader_v04(fiber->pool); + row = row_reader_v11(fiber->pool); if (row == NULL) { fiber_close(); fiber_sleep(reconnect_delay); continue; } + r->recovery_lag = ev_now() - row_v11(row)->tm; + i64 lsn = row_v11(row)->lsn; + struct tbuf *data = tbuf_alloc(row->pool); + tbuf_append(data, row_v11(row)->data, row_v11(row)->len); + if (r->wal_row_handler(r, row) < 0) panic("replication failure: can't apply row"); - if (wal_write(r, row_v11(row)->lsn, row) == false) + if (wal_write(r, lsn, data) == false) panic("replication failure: can't write row to WAL"); - next_lsn(r, row_v11(row)->lsn); - confirm_lsn(r, row_v11(row)->lsn); + next_lsn(r, lsn); + confirm_lsn(r, lsn); if (rows++ % 1000 == 0) { prelease(fiber->pool); diff --git a/mod/feeder/feeder.c b/mod/feeder/feeder.c index cf8e86bf0a..f29adbe9ad 100644 --- a/mod/feeder/feeder.c +++ b/mod/feeder/feeder.c @@ -34,7 +34,7 @@ static char *custom_proc_title; static int -send_row(struct recovery_state *r __unused__, const struct tbuf *t) +send_row(struct recovery_state *r __unused__, struct tbuf *t) { u8 *data = t->data; ssize_t bytes, len = t->len; -- GitLab