diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index d7a52c5ee5ca6fe1fa688bf05f1cccaa07aa475c..5260092fd1c4546fb0767964aba5afbf6ce195f4 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -7,7 +7,6 @@ lua_source(lua_sources lua/load_cfg.lua) lua_source(lua_sources lua/schema.lua) lua_source(lua_sources lua/tuple.lua) lua_source(lua_sources lua/session.lua) -lua_source(lua_sources lua/checkpoint_daemon.lua) lua_source(lua_sources lua/feedback_daemon.lua) lua_source(lua_sources lua/net_box.lua) lua_source(lua_sources lua/upgrade.lua) diff --git a/src/box/box.cc b/src/box/box.cc index 121ad787d59b3cab1ceaea964de5cefec05522a1..771f2b8cb287f2f66ac6f39d34c5a0e9c177535e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -850,6 +850,13 @@ box_set_checkpoint_count(void) gc_set_min_checkpoint_count(checkpoint_count); } +void +box_set_checkpoint_interval(void) +{ + double interval = cfg_getd("checkpoint_interval"); + gc_set_checkpoint_interval(interval); +} + void box_set_vinyl_memory(void) { diff --git a/src/box/box.h b/src/box/box.h index 6de0691d7cc7194d8c5bfb248be1644e36b093a8..91e41a9db79816bb101d8da96fe77927298ab5b2 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -194,6 +194,7 @@ void box_set_snap_io_rate_limit(void); void box_set_too_long_threshold(void); void box_set_readahead(void); void box_set_checkpoint_count(void); +void box_set_checkpoint_interval(void); void box_set_memtx_memory(void); void box_set_memtx_max_tuple_size(void); void box_set_vinyl_memory(void); diff --git a/src/box/gc.c b/src/box/gc.c index e1b23eeda36731addc67e0c060cef15fb733a440..e8074078ebf01de7e1614258d609ac81de319666 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -39,10 +39,12 @@ #include <stdint.h> #include <stdlib.h> #include <stdio.h> +#include <time.h> #define RB_COMPACT 1 #include <small/rb.h> #include <small/rlist.h> +#include <tarantool_ev.h> #include "diag.h" #include "errcode.h" @@ -55,11 +57,14 @@ #include "schema.h" #include "engine.h" /* engine_collect_garbage() */ #include "wal.h" /* wal_collect_garbage() */ +#include "checkpoint_schedule.h" struct gc_state gc; static int gc_cleanup_fiber_f(va_list); +static int +gc_checkpoint_fiber_f(va_list); /** * Comparator used for ordering gc_consumer objects by signature @@ -108,12 +113,19 @@ gc_init(void) rlist_create(&gc.checkpoints); gc_tree_new(&gc.consumers); fiber_cond_create(&gc.cleanup_cond); + checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0); gc.cleanup_fiber = fiber_new("gc", gc_cleanup_fiber_f); if (gc.cleanup_fiber == NULL) panic("failed to start garbage collection fiber"); + gc.checkpoint_fiber = fiber_new("checkpoint_daemon", + gc_checkpoint_fiber_f); + if (gc.checkpoint_fiber == NULL) + panic("failed to start checkpoint daemon fiber"); + fiber_start(gc.cleanup_fiber); + fiber_start(gc.checkpoint_fiber); } void @@ -293,6 +305,18 @@ gc_set_min_checkpoint_count(int min_checkpoint_count) gc.min_checkpoint_count = min_checkpoint_count; } +void +gc_set_checkpoint_interval(double interval) +{ + /* + * Reconfigure the schedule and wake up the checkpoint + * daemon so that it can readjust. + */ + checkpoint_schedule_cfg(&gc.checkpoint_schedule, + ev_monotonic_now(loop()), interval); + fiber_wakeup(gc.checkpoint_fiber); +} + void gc_add_checkpoint(const struct vclock *vclock) { @@ -327,16 +351,13 @@ gc_add_checkpoint(const struct vclock *vclock) gc_schedule_cleanup(); } -int -gc_checkpoint(void) +static int +gc_do_checkpoint(void) { int rc; struct vclock vclock; - if (gc.checkpoint_is_in_progress) { - diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS); - return -1; - } + assert(!gc.checkpoint_is_in_progress); gc.checkpoint_is_in_progress = true; /* @@ -371,6 +392,27 @@ gc_checkpoint(void) latch_unlock(&schema_lock); gc.checkpoint_is_in_progress = false; + return rc; +} + +int +gc_checkpoint(void) +{ + if (gc.checkpoint_is_in_progress) { + diag_set(ClientError, ER_CHECKPOINT_IN_PROGRESS); + return -1; + } + + /* + * Reset the schedule and wake up the checkpoint daemon + * so that it can readjust. + */ + checkpoint_schedule_reset(&gc.checkpoint_schedule, + ev_monotonic_now(loop())); + fiber_wakeup(gc.checkpoint_fiber); + + if (gc_do_checkpoint() != 0) + return -1; /* * Wait for background garbage collection that might @@ -380,10 +422,55 @@ gc_checkpoint(void) * time box.snapshot() returns, all outdated checkpoint * files have been removed. */ - if (rc == 0) - gc_wait_cleanup(); + gc_wait_cleanup(); + return 0; +} - return rc; +static int +gc_checkpoint_fiber_f(va_list ap) +{ + (void)ap; + + /* + * Make the fiber non-cancellable so as not to bother + * about spurious wakeups. + */ + fiber_set_cancellable(false); + + struct checkpoint_schedule *sched = &gc.checkpoint_schedule; + while (!fiber_is_cancelled()) { + double timeout = checkpoint_schedule_timeout(sched, + ev_monotonic_now(loop())); + if (timeout > 0) { + char buf[128]; + struct tm tm; + time_t time = (time_t)(ev_now(loop()) + timeout); + localtime_r(&time, &tm); + strftime(buf, sizeof(buf), "%c", &tm); + say_info("scheduled next checkpoint for %s", buf); + } else { + /* Periodic checkpointing is disabled. */ + timeout = TIMEOUT_INFINITY; + } + if (!fiber_yield_timeout(timeout)) { + /* + * The checkpoint schedule has changed. + * Reschedule the next checkpoint. + */ + continue; + } + /* Time to make the next scheduled checkpoint. */ + if (gc.checkpoint_is_in_progress) { + /* + * Another fiber is making a checkpoint. + * Skip this one. + */ + continue; + } + if (gc_do_checkpoint() != 0) + diag_log(); + } + return 0; } void diff --git a/src/box/gc.h b/src/box/gc.h index 1592772678e452304ded5d136168175fd85899e5..ffbafd34591816d4848067420839a8fc93f275d4 100644 --- a/src/box/gc.h +++ b/src/box/gc.h @@ -38,6 +38,7 @@ #include "fiber_cond.h" #include "vclock.h" #include "trivia/util.h" +#include "checkpoint_schedule.h" #if defined(__cplusplus) extern "C" { @@ -122,6 +123,10 @@ struct gc_state { struct rlist checkpoints; /** Registered consumers, linked by gc_consumer::node. */ gc_tree_t consumers; + /** Fiber responsible for periodic checkpointing. */ + struct fiber *checkpoint_fiber; + /** Schedule of periodic checkpoints. */ + struct checkpoint_schedule checkpoint_schedule; /** Fiber that removes old files in the background. */ struct fiber *cleanup_fiber; /** @@ -214,6 +219,13 @@ gc_advance(const struct vclock *vclock); void gc_set_min_checkpoint_count(int min_checkpoint_count); +/** + * Set the time interval between checkpoints, in seconds. + * Setting the interval to 0 disables periodic checkpointing. + */ +void +gc_set_checkpoint_interval(double interval); + /** * Track an existing checkpoint in the garbage collector state. * Note, this function may trigger garbage collection to remove diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index c3825591cc47d2a3e96cc03ab265109bfec71df4..4f08c78e1fcf7fa6af49cb27a1c337857dd384f3 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -164,6 +164,17 @@ lbox_cfg_set_checkpoint_count(struct lua_State *L) return 0; } +static int +lbox_cfg_set_checkpoint_interval(struct lua_State *L) +{ + try { + box_set_checkpoint_interval(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + static int lbox_cfg_set_read_only(struct lua_State *L) { @@ -340,6 +351,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_too_long_threshold", lbox_cfg_set_too_long_threshold}, {"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit}, {"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count}, + {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, {"cfg_set_read_only", lbox_cfg_set_read_only}, {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, {"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size}, diff --git a/src/box/lua/checkpoint_daemon.lua b/src/box/lua/checkpoint_daemon.lua deleted file mode 100644 index 576c4a5cca6d2f1a12dd52cf01ffe849f0db0a66..0000000000000000000000000000000000000000 --- a/src/box/lua/checkpoint_daemon.lua +++ /dev/null @@ -1,136 +0,0 @@ --- checkpoint_daemon.lua (internal file) - -local log = require 'log' -local fiber = require 'fiber' -local fio = require 'fio' -local yaml = require 'yaml' -local errno = require 'errno' -local digest = require 'digest' -local pickle = require 'pickle' - -local PREFIX = 'checkpoint_daemon' - -local daemon = { - checkpoint_interval = 0; - fiber = nil; - control = nil; -} - --- create snapshot, return true if no errors -local function snapshot() - log.info("making snapshot...") - local s, e = pcall(function() box.snapshot() end) - if s then - return true - end - -- don't complain in the log if the snapshot already exists - if errno() == errno.EEXIST then - return false - end - log.error("error while creating snapshot: %s", e) - return false -end - --- check filesystem and current time -local function process(self) - - if daemon.checkpoint_interval == nil then - return false - end - - if not(daemon.checkpoint_interval > 0) then - return false - end - - local checkpoints = box.info.gc().checkpoints - local last_checkpoint = checkpoints[#checkpoints] - - local last_snap = fio.pathjoin(box.cfg.memtx_dir, - string.format('%020d.snap', last_checkpoint.signature)) - local snstat = fio.stat(last_snap) - if snstat == nil then - log.error("can't stat %s: %s", last_snap, errno.strerror()) - return false - end - if snstat.mtime + daemon.checkpoint_interval <= fiber.time() then - return snapshot() - end -end - -local function daemon_fiber(self) - fiber.name(PREFIX, {truncate = true}) - log.info("started") - - -- - -- Add random offset to the initial period to avoid simultaneous - -- snapshotting when multiple instances of tarantool are running - -- on the same host. - -- See https://github.com/tarantool/tarantool/issues/732 - -- - local random = pickle.unpack('i', digest.urandom(4)) - local offset = random % self.checkpoint_interval - while true do - local period = self.checkpoint_interval + offset - -- maintain next_snapshot_time as a self member for testing purposes - self.next_snapshot_time = fiber.time() + period - log.info("scheduled the next snapshot at %s", - os.date("%c", self.next_snapshot_time)) - local msg = self.control:get(period) - if msg == 'shutdown' then - break - elseif msg == 'reload' then - offset = random % self.checkpoint_interval - log.info("reloaded") -- continue - elseif msg == nil and box.info.status == 'running' then - local s, e = pcall(process, self) - if not s then - log.error(e) - end - offset = 0 - end - end - self.next_snapshot_time = nil - log.info("stopped") -end - -local function reload(self) - if self.checkpoint_interval > 0 then - if self.control == nil then - -- Start daemon - self.control = fiber.channel() - self.fiber = fiber.create(daemon_fiber, self) - fiber.sleep(0) - else - -- Reload daemon - self.control:put("reload") - -- - -- channel:put() doesn't block the writer if there - -- is a ready reader. Give daemon fiber way so that - -- it can execute before reload() returns to the caller. - -- - fiber.sleep(0) - end - elseif self.control ~= nil then - -- Shutdown daemon - self.control:put("shutdown") - self.fiber = nil - self.control = nil - fiber.sleep(0) -- see comment above - end -end - -setmetatable(daemon, { - __index = { - set_checkpoint_interval = function() - daemon.checkpoint_interval = box.cfg.checkpoint_interval - reload(daemon) - return - end, - } -}) - -if box.internal == nil then - box.internal = { [PREFIX] = daemon } -else - box.internal[PREFIX] = daemon -end diff --git a/src/box/lua/init.c b/src/box/lua/init.c index ccb4c6a4650ff72606a6f80fb9bc370d746415d3..0e90f6be5147f1f5ad0e1345b19b46207e31a310 100644 --- a/src/box/lua/init.c +++ b/src/box/lua/init.c @@ -65,7 +65,6 @@ extern char session_lua[], schema_lua[], load_cfg_lua[], xlog_lua[], - checkpoint_daemon_lua[], feedback_daemon_lua[], net_box_lua[], upgrade_lua[], @@ -75,7 +74,6 @@ static const char *lua_sources[] = { "box/session", session_lua, "box/tuple", tuple_lua, "box/schema", schema_lua, - "box/checkpoint_daemon", checkpoint_daemon_lua, "box/feedback_daemon", feedback_daemon_lua, "box/upgrade", upgrade_lua, "box/net_box", net_box_lua, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 38e742c858d95a4361687eb9df513e385a7998a5..321fd3ad46300f9fc83f4ff4a98ea0be0825ca96 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -227,7 +227,7 @@ local dynamic_cfg = { vinyl_cache = private.cfg_set_vinyl_cache, vinyl_timeout = private.cfg_set_vinyl_timeout, checkpoint_count = private.cfg_set_checkpoint_count, - checkpoint_interval = private.checkpoint_daemon.set_checkpoint_interval, + checkpoint_interval = private.cfg_set_checkpoint_interval, worker_pool_threads = private.cfg_set_worker_pool_threads, feedback_enabled = private.feedback_daemon.set_feedback_params, feedback_host = private.feedback_daemon.set_feedback_params, diff --git a/test/xlog/checkpoint_daemon.result b/test/xlog/checkpoint_daemon.result index 3a75137d2f26c3049f12038f37841e21ae91e8a3..6c96da0d5b51e7be2a3c7bc0d5e5f554b285d57d 100644 --- a/test/xlog/checkpoint_daemon.result +++ b/test/xlog/checkpoint_daemon.result @@ -16,6 +16,12 @@ test_run = env.new() test_run:cleanup_cluster() --- ... +default_checkpoint_count = box.cfg.checkpoint_count +--- +... +default_checkpoint_interval = box.cfg.checkpoint_interval +--- +... box.cfg{checkpoint_interval = 0} --- ... @@ -144,47 +150,6 @@ test_run:cmd("setopt delimiter ''"); --- - true ... --- restore default options -box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 } ---- -... -space:drop() ---- -... -daemon = box.internal.checkpoint_daemon ---- -... --- stop daemon -box.cfg{ checkpoint_interval = 0 } ---- -... --- wait daemon to stop -while daemon.fiber ~= nil do fiber.sleep(0) end ---- -... -daemon.fiber == nil ---- -- true -... --- start daemon -box.cfg{ checkpoint_interval = 10 } ---- -... -daemon.fiber ~= nil ---- -- true -... --- reload configuration -box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 } ---- -... -daemon.checkpoint_interval == 15 ---- -- true -... -daemon.checkpoint_count = 20 ---- -... -- Check that checkpoint_count can't be < 1. box.cfg{ checkpoint_count = 1 } --- @@ -198,101 +163,13 @@ box.cfg.checkpoint_count --- - 1 ... --- Start -PERIOD = 3600 ---- -... -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} ---- -... -snapshot_time, time = daemon.next_snapshot_time, fiber.time() ---- -... -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -daemon_fiber = daemon.fiber ---- -... -daemon_control = daemon.control ---- -... --- Reload #1 -PERIOD = 100 +-- Restore default options. +box.cfg{checkpoint_count = default_checkpoint_count} --- ... -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} +box.cfg{checkpoint_interval = default_checkpoint_interval} --- ... -snapshot_time, time = daemon.next_snapshot_time, fiber.time() ---- -... -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -daemon.fiber == daemon_fiber ---- -- true -... -daemon.control == daemon_control ---- -- true -... --- Reload #2 -PERIOD = 1000 ---- -... -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} ---- -... -snapshot_time, time = daemon.next_snapshot_time, fiber.time() ---- -... -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} ---- -- true -... -daemon.fiber == daemon_fiber ---- -- true -... -daemon.control == daemon_control ---- -- true -... -daemon_control = nil ---- -... -daemin_fiber = nil ---- -... --- Shutdown -box.cfg{ checkpoint_count = 2, checkpoint_interval = 0} ---- -... -daemon.next_snapshot_time ---- -- null -... -daemon.fiber == nil ---- -- true -... -daemon.control == nil +space:drop() --- -- true ... diff --git a/test/xlog/checkpoint_daemon.test.lua b/test/xlog/checkpoint_daemon.test.lua index f34906217c7b3d9a3034da9581240fbd002b4a67..37d7f7528911e29fa18b275563aeefe23f49b9ec 100644 --- a/test/xlog/checkpoint_daemon.test.lua +++ b/test/xlog/checkpoint_daemon.test.lua @@ -6,6 +6,8 @@ test_run = env.new() test_run:cleanup_cluster() +default_checkpoint_count = box.cfg.checkpoint_count +default_checkpoint_interval = box.cfg.checkpoint_interval box.cfg{checkpoint_interval = 0} PERIOD = jit.os == 'Linux' and 0.03 or 1.5 @@ -85,62 +87,13 @@ test_run:wait_cond(function() end, WAIT_COND_TIMEOUT); test_run:cmd("setopt delimiter ''"); --- restore default options -box.cfg{checkpoint_interval = 3600 * 4, checkpoint_count = 4 } -space:drop() - -daemon = box.internal.checkpoint_daemon --- stop daemon -box.cfg{ checkpoint_interval = 0 } --- wait daemon to stop -while daemon.fiber ~= nil do fiber.sleep(0) end -daemon.fiber == nil --- start daemon -box.cfg{ checkpoint_interval = 10 } -daemon.fiber ~= nil --- reload configuration -box.cfg{ checkpoint_interval = 15, checkpoint_count = 20 } -daemon.checkpoint_interval == 15 -daemon.checkpoint_count = 20 - -- Check that checkpoint_count can't be < 1. box.cfg{ checkpoint_count = 1 } box.cfg{ checkpoint_count = 0 } box.cfg.checkpoint_count --- Start -PERIOD = 3600 -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} -snapshot_time, time = daemon.next_snapshot_time, fiber.time() -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} - -daemon_fiber = daemon.fiber -daemon_control = daemon.control - --- Reload #1 -PERIOD = 100 -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} -snapshot_time, time = daemon.next_snapshot_time, fiber.time() -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} -daemon.fiber == daemon_fiber -daemon.control == daemon_control - --- Reload #2 -PERIOD = 1000 -box.cfg{ checkpoint_count = 2, checkpoint_interval = PERIOD} -snapshot_time, time = daemon.next_snapshot_time, fiber.time() -snapshot_time + 1 >= time + PERIOD or {snapshot_time, time, PERIOD} -snapshot_time - 1 <= time + 2 * PERIOD or {snapshot_time, time, PERIOD} -daemon.fiber == daemon_fiber -daemon.control == daemon_control - -daemon_control = nil -daemin_fiber = nil - --- Shutdown -box.cfg{ checkpoint_count = 2, checkpoint_interval = 0} -daemon.next_snapshot_time -daemon.fiber == nil -daemon.control == nil +-- Restore default options. +box.cfg{checkpoint_count = default_checkpoint_count} +box.cfg{checkpoint_interval = default_checkpoint_interval} + +space:drop()