Skip to content
Snippets Groups Projects
Commit eaafae0c authored by Yuriy Vostrikov's avatar Yuriy Vostrikov
Browse files

[core] use buffered read when doing recovery

parent 62a9f4cf
No related branches found
No related tags found
No related merge requests found
......@@ -42,27 +42,36 @@ struct remote_state {
int (*handler) (struct recovery_state * r, struct tbuf *row);
};
static u32
row_v11_len(struct tbuf *r)
{
if (r->len < sizeof(struct row_v11))
return 0;
if (r->len < sizeof(struct row_v11) + row_v11(r)->len)
return 0;
return sizeof(struct row_v11) + row_v11(r)->len;
}
static struct tbuf *
row_reader_v11(struct palloc_pool *pool)
row_reader_v11()
{
const int header_size = sizeof(struct row_v11);
struct tbuf *m = tbuf_alloc(pool);
tbuf_ensure(m, header_size);
struct tbuf *m;
if (fiber_read(m->data, header_size) != header_size) {
say_error("unexpected eof reading row header");
return NULL;
}
tbuf_ensure(m, header_size + row_v11(m)->len);
m->len = header_size + row_v11(m)->len;
for (;;) {
if (row_v11_len(fiber->rbuf) != 0) {
m = tbuf_split(fiber->rbuf, row_v11_len(fiber->rbuf));
say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m));
return m;
}
if (fiber_read(row_v11(m)->data, row_v11(m)->len) != row_v11(m)->len) {
say_error("unexpected eof reading row body");
return NULL;
if (fiber_bread(fiber->rbuf, header_size) <= 0) {
say_error("unexpected eof reading row header");
return NULL;
}
}
say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m));
return m;
}
static struct tbuf *
......@@ -135,7 +144,7 @@ pull_from_remote(void *state)
if (h->handler(h->r, row) < 0)
continue;
prelease_after(fiber->pool, 128 * 1024);
fiber_gc();
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment