Skip to content
Snippets Groups Projects
Commit 45efa082 authored by Yuriy Vostrikov's avatar Yuriy Vostrikov
Browse files

[core, feeder] support for v11 rows over network replication

parent 512f7c34
No related branches found
No related tags found
No related merge requests found
...@@ -67,6 +67,7 @@ struct recovery_state { ...@@ -67,6 +67,7 @@ struct recovery_state {
log_io_class->reader is responsible of converting data from old format */ log_io_class->reader is responsible of converting data from old format */
row_handler *wal_row_handler, *snap_row_handler; row_handler *wal_row_handler, *snap_row_handler;
ev_timer wal_timer; ev_timer wal_timer;
ev_tstamp recovery_lag;
int snap_io_rate_limit; int snap_io_rate_limit;
......
...@@ -39,23 +39,23 @@ ...@@ -39,23 +39,23 @@
#include "log_io_internal.h" #include "log_io_internal.h"
static struct tbuf * static struct tbuf *
row_reader_v04(struct palloc_pool *pool) row_reader_v11(struct palloc_pool *pool)
{ {
const int header_size = offsetof(struct row_v04, data); const int header_size = sizeof(struct row_v11);
struct tbuf *m = tbuf_alloc(pool); struct tbuf *m = tbuf_alloc(pool);
tbuf_ensure(m, header_size);
if (fiber_read(m->data, header_size) != header_size) { if (fiber_read(m->data, header_size) != header_size) {
say_error("unexpected eof reading row header"); say_error("unexpected eof reading row header");
return NULL; return NULL;
} }
m->len = header_size; tbuf_ensure(m, header_size + row_v11(m)->len);
m->len = header_size + row_v11(m)->len;
tbuf_ensure(m, header_size + row_v04(m)->len); if (fiber_read(row_v11(m)->data, row_v11(m)->len) != row_v11(m)->len) {
if (fiber_read(row_v04(m)->data, row_v04(m)->len) != row_v04(m)->len) {
say_error("unexpected eof reading row body"); say_error("unexpected eof reading row body");
return NULL; return NULL;
} }
m->len += row_v04(m)->len;
say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m)); say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m));
return m; return m;
...@@ -91,21 +91,26 @@ pull_from_remote(void *state) ...@@ -91,21 +91,26 @@ pull_from_remote(void *state)
warning_said = false; warning_said = false;
} }
row = row_reader_v04(fiber->pool); row = row_reader_v11(fiber->pool);
if (row == NULL) { if (row == NULL) {
fiber_close(); fiber_close();
fiber_sleep(reconnect_delay); fiber_sleep(reconnect_delay);
continue; continue;
} }
r->recovery_lag = ev_now() - row_v11(row)->tm;
i64 lsn = row_v11(row)->lsn;
struct tbuf *data = tbuf_alloc(row->pool);
tbuf_append(data, row_v11(row)->data, row_v11(row)->len);
if (r->wal_row_handler(r, row) < 0) if (r->wal_row_handler(r, row) < 0)
panic("replication failure: can't apply row"); panic("replication failure: can't apply row");
if (wal_write(r, row_v11(row)->lsn, row) == false) if (wal_write(r, lsn, data) == false)
panic("replication failure: can't write row to WAL"); panic("replication failure: can't write row to WAL");
next_lsn(r, row_v11(row)->lsn); next_lsn(r, lsn);
confirm_lsn(r, row_v11(row)->lsn); confirm_lsn(r, lsn);
if (rows++ % 1000 == 0) { if (rows++ % 1000 == 0) {
prelease(fiber->pool); prelease(fiber->pool);
......
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
static char *custom_proc_title; static char *custom_proc_title;
static int static int
send_row(struct recovery_state *r __unused__, const struct tbuf *t) send_row(struct recovery_state *r __unused__, struct tbuf *t)
{ {
u8 *data = t->data; u8 *data = t->data;
ssize_t bytes, len = t->len; ssize_t bytes, len = t->len;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment