diff --git a/core/log_io_remote.c b/core/log_io_remote.c index 5e5d66539f9e97d271f1edf3b755003c100ccca2..f466cdcce5ec94b7fb91802b44a4bc301e0ce5f3 100644 --- a/core/log_io_remote.c +++ b/core/log_io_remote.c @@ -42,27 +42,36 @@ struct remote_state { int (*handler) (struct recovery_state * r, struct tbuf *row); }; +static u32 +row_v11_len(struct tbuf *r) +{ + if (r->len < sizeof(struct row_v11)) + return 0; + + if (r->len < sizeof(struct row_v11) + row_v11(r)->len) + return 0; + + return sizeof(struct row_v11) + row_v11(r)->len; +} + static struct tbuf * -row_reader_v11(struct palloc_pool *pool) +row_reader_v11() { const int header_size = sizeof(struct row_v11); - struct tbuf *m = tbuf_alloc(pool); - tbuf_ensure(m, header_size); + struct tbuf *m; - if (fiber_read(m->data, header_size) != header_size) { - say_error("unexpected eof reading row header"); - return NULL; - } - tbuf_ensure(m, header_size + row_v11(m)->len); - m->len = header_size + row_v11(m)->len; + for (;;) { + if (row_v11_len(fiber->rbuf) != 0) { + m = tbuf_split(fiber->rbuf, row_v11_len(fiber->rbuf)); + say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m)); + return m; + } - if (fiber_read(row_v11(m)->data, row_v11(m)->len) != row_v11(m)->len) { - say_error("unexpected eof reading row body"); - return NULL; + if (fiber_bread(fiber->rbuf, header_size) <= 0) { + say_error("unexpected eof reading row header"); + return NULL; + } } - - say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m)); - return m; } static struct tbuf * @@ -135,7 +144,7 @@ pull_from_remote(void *state) if (h->handler(h->r, row) < 0) continue; - prelease_after(fiber->pool, 128 * 1024); + fiber_gc(); } }