diff --git a/changelogs/unreleased/gh-8108-adjust-xlog_read_ahead.md b/changelogs/unreleased/gh-8108-adjust-xlog_read_ahead.md new file mode 100644 index 0000000000000000000000000000000000000000..25e56c1b37390d0b76069bdbec16c40c468f0f7b --- /dev/null +++ b/changelogs/unreleased/gh-8108-adjust-xlog_read_ahead.md @@ -0,0 +1,4 @@ +## feature/core + +* Reduced recovery time from a snapshot by up to 2x on the systems with a hard + disk drive (gh-8108). diff --git a/src/box/xlog.c b/src/box/xlog.c index ca8f2978d9d76fe3367eae2a4283e9f34c6f2d4e..ec7c73f6c83f6e1b06e2baa95b312627ee40f8fa 100644 --- a/src/box/xlog.c +++ b/src/box/xlog.c @@ -1526,7 +1526,13 @@ xlog_close(struct xlog *l, bool reuse_fd) /* {{{ struct xlog_cursor */ -#define XLOG_READ_AHEAD (1 << 14) +enum { + /** + * Min and max values for xlog_cursor::read_ahead. + */ + XLOG_READ_AHEAD_MIN = XLOG_TX_AUTOCOMMIT_THRESHOLD, + XLOG_READ_AHEAD_MAX = 8 * 1024 * 1024, +}; /** * Ensure that at least count bytes are in read buffer @@ -1545,7 +1551,7 @@ xlog_cursor_ensure(struct xlog_cursor *cursor, size_t count) return 1; size_t to_load = count - ibuf_used(&cursor->rbuf); - to_load += XLOG_READ_AHEAD; + to_load += cursor->read_ahead; void *dst = ibuf_reserve(&cursor->rbuf, to_load); if (dst == NULL) { @@ -1567,9 +1573,25 @@ xlog_cursor_ensure(struct xlog_cursor *cursor, size_t count) cursor->name); return -1; } - /* ibuf_reserve() has been called above, ibuf_alloc() must not fail */ assert((size_t)readen <= to_load); + /* ibuf_reserve() has been called above, ibuf_alloc() must not fail */ ibuf_alloc(&cursor->rbuf, readen); + /* Shrink the read buffer to reduce the memory consumption. */ + if (cursor->need_rbuf_shrink) { + ibuf_shrink(&cursor->rbuf); + cursor->need_rbuf_shrink = false; + } + /* + * Grow readahead size if the requested number of bytes was successfully + * read, and decrease it to the minimum otherwise. + */ + if ((size_t)readen == to_load) { + if (cursor->read_ahead * 2 <= XLOG_READ_AHEAD_MAX) + cursor->read_ahead *= 2; + } else { + cursor->need_rbuf_shrink = true; + cursor->read_ahead = XLOG_READ_AHEAD_MIN; + } cursor->read_offset += readen; return ibuf_used(&cursor->rbuf) >= count ? 0: 1; } @@ -1982,6 +2004,7 @@ xlog_cursor_openfd(struct xlog_cursor *i, int fd, const char *name) { memset(i, 0, sizeof(*i)); i->fd = fd; + i->read_ahead = XLOG_READ_AHEAD_MIN; ibuf_create(&i->rbuf, &cord()->slabc, XLOG_TX_AUTOCOMMIT_THRESHOLD << 1); diff --git a/src/box/xlog.h b/src/box/xlog.h index 1d7f564aba755de68c554483118be1d57aef19d0..1cc2934611a9a4cdeb9fd2e5730b613b888a1959 100644 --- a/src/box/xlog.h +++ b/src/box/xlog.h @@ -661,6 +661,10 @@ struct xlog_cursor { char name[PATH_MAX]; /** file read buffer */ struct ibuf rbuf; + /** whether the buffer needs to be shrunk */ + bool need_rbuf_shrink; + /** how many additional bytes are read to the buffer */ + size_t read_ahead; /** file read position */ off_t read_offset; /** cursor for current tx */ diff --git a/src/lib/small b/src/lib/small index 5c3a0880ae4659cb662085563ff0b9eee40565fe..dd612028552908e06078f1bc94fde017bebd2446 160000 --- a/src/lib/small +++ b/src/lib/small @@ -1 +1 @@ -Subproject commit 5c3a0880ae4659cb662085563ff0b9eee40565fe +Subproject commit dd612028552908e06078f1bc94fde017bebd2446 diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 353571389af0d64d47533549b5723bd3b61f133e..43bda24690a1e5da9538e885f24d951bc3d4437e 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -180,6 +180,10 @@ create_unit_test(PREFIX xrow SOURCES xrow.cc core_test_utils.c LIBRARIES xrow unit ) +create_unit_test(PREFIX xlog + SOURCES xlog.c core_test_utils.c + LIBRARIES xlog xrow unit +) create_unit_test(PREFIX decimal SOURCES decimal.c LIBRARIES core unit diff --git a/test/unit/xlog.c b/test/unit/xlog.c new file mode 100644 index 0000000000000000000000000000000000000000..ae4d920ef9d41d24b6a093cfd590ac735529521c --- /dev/null +++ b/test/unit/xlog.c @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause + * + * Copyright 2010-2023, Tarantool AUTHORS, please see AUTHORS file. + */ + +#define UNIT_TAP_COMPATIBLE 1 +#include "unit.h" +#include "xlog.h" +#include "xrow.h" +#include "crc32.h" +#include "random.h" +#include "memory.h" +#include "iproto_constants.h" + +/** + * Keep in sync with src/box/xlog.c! + */ +enum { + XLOG_READ_AHEAD_MIN = 128 * 1024, + XLOG_READ_AHEAD_MAX = 8 * 1024 * 1024, +}; + +/** + * Create a temporary directory, initialize it as xdir, and create a new xlog. + */ +static void +create_xlog(struct xlog *xlog, char *dirname) +{ + fail_if(mkdtemp(dirname) == NULL); + + struct xdir xdir; + struct tt_uuid tt_uuid; + struct vclock vclock; + memset(&tt_uuid, 1, sizeof(tt_uuid)); + memset(&vclock, 0, sizeof(vclock)); + + xdir_create(&xdir, dirname, XLOG, &tt_uuid, &xlog_opts_default); + + fail_if(xdir_create_xlog(&xdir, xlog, &vclock) < 0); +} + +/** + * Write a tuple to the xlog. + */ +static void +write_tuple(struct xlog *xlog, const char *data, uint32_t size) +{ + static int64_t lsn; + struct request_replace_body body; + request_replace_body_create(&body, 0); + + struct xrow_header row; + memset(&row, 0, sizeof(struct xrow_header)); + row.lsn = ++lsn; + row.type = IPROTO_INSERT; + row.bodycnt = 2; + row.body[0].iov_base = &body; + row.body[0].iov_len = sizeof(body); + row.body[1].iov_base = (char *)data; + row.body[1].iov_len = size; + + fail_if(xlog_write_row(xlog, &row) < 0); +} + +/** + * Write 1 KB of random uncompressed data to the xlog. The compressed size + * is roughly the same due to the randomness of the data. + */ +static void +write_1k(struct xlog *xlog) +{ + char data[1024]; + const size_t data_size = sizeof(data) - 3; + random_bytes(mp_encode_binl(data, data_size), data_size); + write_tuple(xlog, data, sizeof(data)); +} + +/** + * Test that the size of the read buffer dynamically increased while reading a + * large file, and shrunk when xlog is written/read by small portions of data. + */ +static void +test_dynamic_sized_ibuf(void) +{ + header(); + plan(4); + struct xlog xlog; + char dirname[] = "./xlog.XXXXXX"; + char filename[PATH_MAX]; + create_xlog(&xlog, dirname); + strlcpy(filename, xlog.filename, sizeof(filename)); + + /* Write about 20 MB of data to the xlog. */ + for (int i = 0; i < 20 * 1024; i++) + write_1k(&xlog); + fail_if(xlog_flush(&xlog) < 0); + + struct xlog_cursor cursor; + fail_if(xlog_cursor_open(&cursor, xlog.filename) < 0); + + /* + * Read the whole xlog and check that the size of the buffer reaches + * maximum value while reading, it will decrease when reading near the + * end of the file, so keep it on each iteration. + */ + int rc; + int64_t prev_lsn = 0; + struct xrow_header row; + size_t read_ahead_max = 0; + size_t ibuf_used_max = 0; + while ((rc = xlog_cursor_next(&cursor, &row, false)) == 0) { + fail_if(row.lsn != prev_lsn + 1); + prev_lsn = row.lsn; + + read_ahead_max = MAX(cursor.read_ahead, read_ahead_max); + ibuf_used_max = MAX(ibuf_used_max, ibuf_used(&cursor.rbuf)); + } + + is(read_ahead_max, XLOG_READ_AHEAD_MAX, + "read_ahead increased to %d", XLOG_READ_AHEAD_MAX); + ok(ibuf_used_max >= XLOG_READ_AHEAD_MAX, + "ibuf size increased to at least %d", XLOG_READ_AHEAD_MAX); + + /* + * Do 1 KB write/read to shrink the read buffer to the minimal capacity. + */ + write_1k(&xlog); + fail_if(xlog_flush(&xlog) < 0); + while ((rc = xlog_cursor_next(&cursor, &row, false)) == 0) { + fail_if(row.lsn != prev_lsn + 1); + prev_lsn = row.lsn; + } + + is(cursor.read_ahead, XLOG_READ_AHEAD_MIN, + "read_ahead decreased to %d", XLOG_READ_AHEAD_MIN); + ok(ibuf_capacity(&cursor.rbuf) == 0, "ibuf capacity decreased to 0"); + + xlog_cursor_close(&cursor, false); + fail_if(xlog_close(&xlog, false) < 0); + unlink(filename); + rmdir(dirname); + + check_plan(); + footer(); +} + +int +main(void) +{ + plan(1); + crc32_init(); + memory_init(); + random_init(); + + test_dynamic_sized_ibuf(); + + random_free(); + memory_free(); + return check_plan(); +}