diff --git a/src/box/box.cc b/src/box/box.cc index 771f2b8cb287f2f66ac6f39d34c5a0e9c177535e..9f2fd6da14bd8def0c7446879866499c59d138dc 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -857,6 +857,13 @@ box_set_checkpoint_interval(void) gc_set_checkpoint_interval(interval); } +void +box_set_checkpoint_wal_threshold(void) +{ + int64_t threshold = cfg_geti64("checkpoint_wal_threshold"); + wal_set_checkpoint_threshold(threshold); +} + void box_set_vinyl_memory(void) { @@ -2023,6 +2030,13 @@ on_wal_garbage_collection(const struct vclock *vclock) gc_advance(vclock); } +static void +on_wal_checkpoint_threshold(void) +{ + say_info("WAL threshold exceeded, triggering checkpoint"); + gc_trigger_checkpoint(); +} + void box_init(void) { @@ -2136,7 +2150,8 @@ box_cfg_xc(void) enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode")); if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows, wal_max_size, &INSTANCE_UUID, &replicaset.vclock, - &checkpoint->vclock, on_wal_garbage_collection) != 0) { + &checkpoint->vclock, on_wal_garbage_collection, + on_wal_checkpoint_threshold) != 0) { diag_raise(); } diff --git a/src/box/box.h b/src/box/box.h index 91e41a9db79816bb101d8da96fe77927298ab5b2..6c6c319fc53cdcc3d41e6ae796ad9c88b5f6abb2 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -195,6 +195,7 @@ void box_set_too_long_threshold(void); void box_set_readahead(void); void box_set_checkpoint_count(void); void box_set_checkpoint_interval(void); +void box_set_checkpoint_wal_threshold(void); void box_set_memtx_memory(void); void box_set_memtx_max_tuple_size(void); void box_set_vinyl_memory(void); diff --git a/src/box/gc.c b/src/box/gc.c index 6a7e371f9d57145fc07cc3615aaad063d495bfb9..05503e688db8dc9420751e99616ed1ef2bf59015 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -426,6 +426,18 @@ gc_checkpoint(void) return 0; } +void +gc_trigger_checkpoint(void) +{ + if (gc.checkpoint_is_in_progress || gc.checkpoint_is_pending) + return; + + gc.checkpoint_is_pending = true; + checkpoint_schedule_reset(&gc.checkpoint_schedule, + ev_monotonic_now(loop())); + fiber_wakeup(gc.checkpoint_fiber); +} + static int gc_checkpoint_fiber_f(va_list ap) { @@ -452,7 +464,8 @@ gc_checkpoint_fiber_f(va_list ap) /* Periodic checkpointing is disabled. */ timeout = TIMEOUT_INFINITY; } - if (!fiber_yield_timeout(timeout)) { + if (!fiber_yield_timeout(timeout) && + !gc.checkpoint_is_pending) { /* * The checkpoint schedule has changed. * Reschedule the next checkpoint. @@ -460,6 +473,7 @@ gc_checkpoint_fiber_f(va_list ap) continue; } /* Time to make the next scheduled checkpoint. */ + gc.checkpoint_is_pending = false; if (gc.checkpoint_is_in_progress) { /* * Another fiber is making a checkpoint. diff --git a/src/box/gc.h b/src/box/gc.h index ffbafd34591816d4848067420839a8fc93f275d4..5790ebcc6a9228f7007c29451f82a77700505d55 100644 --- a/src/box/gc.h +++ b/src/box/gc.h @@ -151,6 +151,11 @@ struct gc_state { * Set if there's a fiber making a checkpoint right now. */ bool checkpoint_is_in_progress; + /** + * If this flag is set, the checkpoint daemon should create + * a checkpoint as soon as possible despite the schedule. + */ + bool checkpoint_is_pending; }; extern struct gc_state gc; @@ -246,6 +251,14 @@ gc_add_checkpoint(const struct vclock *vclock); int gc_checkpoint(void); +/** + * Trigger background checkpointing. + * + * The checkpoint will be created by the checkpoint daemon. + */ +void +gc_trigger_checkpoint(void); + /** * Get a reference to @checkpoint and store it in @ref. * This will block the garbage collector from deleting diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 4f08c78e1fcf7fa6af49cb27a1c337857dd384f3..4884ce01354ac366d42c86c3fa68beeb6c423ddf 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -175,6 +175,17 @@ lbox_cfg_set_checkpoint_interval(struct lua_State *L) return 0; } +static int +lbox_cfg_set_checkpoint_wal_threshold(struct lua_State *L) +{ + try { + box_set_checkpoint_wal_threshold(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + static int lbox_cfg_set_read_only(struct lua_State *L) { @@ -352,6 +363,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_snap_io_rate_limit", lbox_cfg_set_snap_io_rate_limit}, {"cfg_set_checkpoint_count", lbox_cfg_set_checkpoint_count}, {"cfg_set_checkpoint_interval", lbox_cfg_set_checkpoint_interval}, + {"cfg_set_checkpoint_wal_threshold", lbox_cfg_set_checkpoint_wal_threshold}, {"cfg_set_read_only", lbox_cfg_set_read_only}, {"cfg_set_memtx_memory", lbox_cfg_set_memtx_memory}, {"cfg_set_memtx_max_tuple_size", lbox_cfg_set_memtx_max_tuple_size}, diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 321fd3ad46300f9fc83f4ff4a98ea0be0825ca96..6dc4a2af6773071b2fbbec35689b33aa40f6b79c 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -68,6 +68,7 @@ local default_cfg = { read_only = false, hot_standby = false, checkpoint_interval = 3600, + checkpoint_wal_threshold = 1e18, checkpoint_count = 2, worker_pool_threads = 4, replication_timeout = 1, @@ -128,6 +129,7 @@ local template_cfg = { username = 'string', coredump = 'boolean', checkpoint_interval = 'number', + checkpoint_wal_threshold = 'number', checkpoint_count = 'number', read_only = 'boolean', hot_standby = 'boolean', @@ -228,6 +230,7 @@ local dynamic_cfg = { vinyl_timeout = private.cfg_set_vinyl_timeout, checkpoint_count = private.cfg_set_checkpoint_count, checkpoint_interval = private.cfg_set_checkpoint_interval, + checkpoint_wal_threshold = private.cfg_set_checkpoint_wal_threshold, worker_pool_threads = private.cfg_set_worker_pool_threads, feedback_enabled = private.feedback_daemon.set_feedback_params, feedback_host = private.feedback_daemon.set_feedback_params, diff --git a/src/box/wal.c b/src/box/wal.c index 8e56e6aeb2ca79670698a2c8d9ee740ec7bd6c70..3b50d3629249b13c6ee58e64d85c1aa0543d5698 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -92,6 +92,7 @@ struct wal_writer struct journal base; /* ----------------- tx ------------------- */ wal_on_garbage_collection_f on_garbage_collection; + wal_on_checkpoint_threshold_f on_checkpoint_threshold; /** * The rollback queue. An accumulator for all requests * that need to be rolled back. Also acts as a valve @@ -126,6 +127,23 @@ struct wal_writer * recover from it even if it is running out of disk space. */ struct vclock checkpoint_vclock; + /** Total size of WAL files written since the last checkpoint. */ + int64_t checkpoint_wal_size; + /** + * Checkpoint threshold: when the total size of WAL files + * written since the last checkpoint exceeds the value of + * this variable, the WAL thread will notify TX that it's + * time to trigger checkpointing. + */ + int64_t checkpoint_threshold; + /** + * This flag is set if the WAL thread has notified TX that + * the checkpoint threshold has been exceeded. It is cleared + * on checkpoint completion. Needed in order not to invoke + * the TX callback over and over again while checkpointing + * is in progress. + */ + bool checkpoint_triggered; /** The current WAL file. */ struct xlog current_wal; /** @@ -309,6 +327,14 @@ tx_notify_gc(struct cmsg *msg) free(msg); } +static void +tx_notify_checkpoint(struct cmsg *msg) +{ + struct wal_writer *writer = &wal_writer_singleton; + writer->on_checkpoint_threshold(); + free(msg); +} + /** * Initialize WAL writer context. Even though it's a singleton, * encapsulate the details just in case we may use @@ -320,7 +346,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, int64_t wal_max_size, const struct tt_uuid *instance_uuid, const struct vclock *vclock, const struct vclock *checkpoint_vclock, - wal_on_garbage_collection_f on_garbage_collection) + wal_on_garbage_collection_f on_garbage_collection, + wal_on_checkpoint_threshold_f on_checkpoint_threshold) { writer->wal_mode = wal_mode; writer->wal_max_rows = wal_max_rows; @@ -336,11 +363,16 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, stailq_create(&writer->rollback); cmsg_init(&writer->in_rollback, NULL); + writer->checkpoint_wal_size = 0; + writer->checkpoint_threshold = INT64_MAX; + writer->checkpoint_triggered = false; + vclock_copy(&writer->vclock, vclock); vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock); rlist_create(&writer->watchers); writer->on_garbage_collection = on_garbage_collection; + writer->on_checkpoint_threshold = on_checkpoint_threshold; } /** Destroy a WAL writer structure. */ @@ -446,14 +478,16 @@ int wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows, int64_t wal_max_size, const struct tt_uuid *instance_uuid, const struct vclock *vclock, const struct vclock *checkpoint_vclock, - wal_on_garbage_collection_f on_garbage_collection) + wal_on_garbage_collection_f on_garbage_collection, + wal_on_checkpoint_threshold_f on_checkpoint_threshold) { assert(wal_max_rows > 1); struct wal_writer *writer = &wal_writer_singleton; wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows, wal_max_size, instance_uuid, vclock, - checkpoint_vclock, on_garbage_collection); + checkpoint_vclock, on_garbage_collection, + on_checkpoint_threshold); /* * Scan the WAL directory to build an index of all @@ -524,6 +558,7 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data) */ } vclock_copy(&msg->vclock, &writer->vclock); + msg->wal_size = writer->checkpoint_wal_size; return 0; } @@ -533,6 +568,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint) struct wal_writer *writer = &wal_writer_singleton; if (writer->wal_mode == WAL_NONE) { vclock_copy(&checkpoint->vclock, &writer->vclock); + checkpoint->wal_size = 0; return 0; } if (!stailq_empty(&writer->rollback)) { @@ -561,7 +597,20 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data) { struct wal_checkpoint *msg = (struct wal_checkpoint *) data; struct wal_writer *writer = &wal_writer_singleton; + /* + * Now, once checkpoint has been created, we can update + * the WAL's version of the last checkpoint vclock and + * reset the size of WAL files written since the last + * checkpoint. Note, since new WAL records may have been + * written while the checkpoint was created, we subtract + * the value of checkpoint_wal_size observed at the time + * when checkpointing started from the current value + * rather than just setting it to 0. + */ vclock_copy(&writer->checkpoint_vclock, &msg->vclock); + assert(writer->checkpoint_wal_size >= msg->wal_size); + writer->checkpoint_wal_size -= msg->wal_size; + writer->checkpoint_triggered = false; return 0; } @@ -580,6 +629,36 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint) fiber_set_cancellable(cancellable); } +struct wal_set_checkpoint_threshold_msg { + struct cbus_call_msg base; + int64_t checkpoint_threshold; +}; + +static int +wal_set_checkpoint_threshold_f(struct cbus_call_msg *data) +{ + struct wal_writer *writer = &wal_writer_singleton; + struct wal_set_checkpoint_threshold_msg *msg; + msg = (struct wal_set_checkpoint_threshold_msg *)data; + writer->checkpoint_threshold = msg->checkpoint_threshold; + return 0; +} + +void +wal_set_checkpoint_threshold(int64_t threshold) +{ + struct wal_writer *writer = &wal_writer_singleton; + if (writer->wal_mode == WAL_NONE) + return; + struct wal_set_checkpoint_threshold_msg msg; + msg.checkpoint_threshold = threshold; + bool cancellable = fiber_set_cancellable(false); + cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + &msg.base, wal_set_checkpoint_threshold_f, NULL, + TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); +} + struct wal_gc_msg { struct cbus_call_msg base; @@ -891,23 +970,50 @@ wal_write_to_disk(struct cmsg *msg) /* * Iterate over requests (transactions) */ + int rc; struct journal_entry *entry; struct stailq_entry *last_committed = NULL; stailq_foreach_entry(entry, &wal_msg->commit, fifo) { wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows); entry->res = vclock_sum(&writer->vclock); - int rc = xlog_write_entry(l, entry); + rc = xlog_write_entry(l, entry); if (rc < 0) goto done; - if (rc > 0) + if (rc > 0) { + writer->checkpoint_wal_size += rc; last_committed = &entry->fifo; + } /* rc == 0: the write is buffered in xlog_tx */ } - if (xlog_flush(l) < 0) + rc = xlog_flush(l); + if (rc < 0) goto done; + writer->checkpoint_wal_size += rc; last_committed = stailq_last(&wal_msg->commit); + /* + * Notify TX if the checkpoint threshold has been exceeded. + * Use malloc() for allocating the notification message and + * don't panic on error, because if we fail to send the + * message now, we will retry next time we process a request. + */ + if (!writer->checkpoint_triggered && + writer->checkpoint_wal_size > writer->checkpoint_threshold) { + static struct cmsg_hop route[] = { + { tx_notify_checkpoint, NULL }, + }; + struct cmsg *msg = malloc(sizeof(*msg)); + if (msg != NULL) { + cmsg_init(msg, route); + cpipe_push(&wal_thread.tx_prio_pipe, msg); + writer->checkpoint_triggered = true; + } else { + say_warn("failed to allocate checkpoint " + "notification message"); + } + } + done: error = diag_last_error(diag_get()); if (error) { diff --git a/src/box/wal.h b/src/box/wal.h index 2564f7180a53157cc346477268f0dbc7415f0099..a9452f2bd4eefadf0f5a1d24f7a6cd8bee3cbab8 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -61,6 +61,13 @@ extern "C" { */ typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock); +/** + * Callback invoked in the TX thread when the total size of WAL + * files written since the last checkpoint exceeds the configured + * threshold. + */ +typedef void (*wal_on_checkpoint_threshold_f)(void); + void wal_thread_start(); @@ -68,7 +75,8 @@ int wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows, int64_t wal_max_size, const struct tt_uuid *instance_uuid, const struct vclock *vclock, const struct vclock *checkpoint_vclock, - wal_on_garbage_collection_f on_garbage_collection); + wal_on_garbage_collection_f on_garbage_collection, + wal_on_checkpoint_threshold_f on_checkpoint_threshold); void wal_thread_stop(); @@ -168,6 +176,12 @@ struct wal_checkpoint { * identify the new checkpoint. */ struct vclock vclock; + /** + * Size of WAL files written since the last checkpoint. + * Used to reset the corresponding WAL counter upon + * successful checkpoint creation. + */ + int64_t wal_size; }; /** @@ -189,6 +203,13 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint); void wal_commit_checkpoint(struct wal_checkpoint *checkpoint); +/** + * Set the WAL size threshold exceeding which will trigger + * checkpointing in TX. + */ +void +wal_set_checkpoint_threshold(int64_t threshold); + /** * Remove WAL files that are not needed by consumers reading * rows at @vclock or newer. diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index b03e5159cd0955259d0fc0f5f8ee956ce5638270..70a4b258c9a51dd22f466145ee2103fdb60595de 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -6,49 +6,50 @@ box.cfg 1 background:false 2 checkpoint_count:2 3 checkpoint_interval:3600 -4 coredump:false -5 feedback_enabled:true -6 feedback_host:https://feedback.tarantool.io -7 feedback_interval:3600 -8 force_recovery:false -9 hot_standby:false -10 listen:port -11 log:tarantool.log -12 log_format:plain -13 log_level:5 -14 memtx_dir:. -15 memtx_max_tuple_size:1048576 -16 memtx_memory:107374182 -17 memtx_min_tuple_size:16 -18 net_msg_max:768 -19 pid_file:box.pid -20 read_only:false -21 readahead:16320 -22 replication_connect_timeout:30 -23 replication_skip_conflict:false -24 replication_sync_lag:10 -25 replication_sync_timeout:300 -26 replication_timeout:1 -27 rows_per_wal:500000 -28 slab_alloc_factor:1.05 -29 too_long_threshold:0.5 -30 vinyl_bloom_fpr:0.05 -31 vinyl_cache:134217728 -32 vinyl_dir:. -33 vinyl_max_tuple_size:1048576 -34 vinyl_memory:134217728 -35 vinyl_page_size:8192 -36 vinyl_range_size:1073741824 -37 vinyl_read_threads:1 -38 vinyl_run_count_per_level:2 -39 vinyl_run_size_ratio:3.5 -40 vinyl_timeout:60 -41 vinyl_write_threads:4 -42 wal_dir:. -43 wal_dir_rescan_delay:2 -44 wal_max_size:268435456 -45 wal_mode:write -46 worker_pool_threads:4 +4 checkpoint_wal_threshold:1e+18 +5 coredump:false +6 feedback_enabled:true +7 feedback_host:https://feedback.tarantool.io +8 feedback_interval:3600 +9 force_recovery:false +10 hot_standby:false +11 listen:port +12 log:tarantool.log +13 log_format:plain +14 log_level:5 +15 memtx_dir:. +16 memtx_max_tuple_size:1048576 +17 memtx_memory:107374182 +18 memtx_min_tuple_size:16 +19 net_msg_max:768 +20 pid_file:box.pid +21 read_only:false +22 readahead:16320 +23 replication_connect_timeout:30 +24 replication_skip_conflict:false +25 replication_sync_lag:10 +26 replication_sync_timeout:300 +27 replication_timeout:1 +28 rows_per_wal:500000 +29 slab_alloc_factor:1.05 +30 too_long_threshold:0.5 +31 vinyl_bloom_fpr:0.05 +32 vinyl_cache:134217728 +33 vinyl_dir:. +34 vinyl_max_tuple_size:1048576 +35 vinyl_memory:134217728 +36 vinyl_page_size:8192 +37 vinyl_range_size:1073741824 +38 vinyl_read_threads:1 +39 vinyl_run_count_per_level:2 +40 vinyl_run_size_ratio:3.5 +41 vinyl_timeout:60 +42 vinyl_write_threads:4 +43 wal_dir:. +44 wal_dir_rescan_delay:2 +45 wal_max_size:268435456 +46 wal_mode:write +47 worker_pool_threads:4 -- -- Test insert from detached fiber -- diff --git a/test/box/admin.result b/test/box/admin.result index 6da53f30eb0cb2d7c295de48fbf4c756587689a3..0b233889a4eb9f42cafccda1f306619d20660f65 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -32,6 +32,8 @@ cfg_filter(box.cfg) - 2 - - checkpoint_interval - 3600 + - - checkpoint_wal_threshold + - 1000000000000000000 - - coredump - false - - feedback_enabled diff --git a/test/box/cfg.result b/test/box/cfg.result index 01e6bc6b9b72fca5a8e2654131c480cabcf4a9f1..68465669ee56cfc6ebcda80b69edc09ae5501a8c 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -20,6 +20,8 @@ cfg_filter(box.cfg) - 2 - - checkpoint_interval - 3600 + - - checkpoint_wal_threshold + - 1000000000000000000 - - coredump - false - - feedback_enabled @@ -119,6 +121,8 @@ cfg_filter(box.cfg) - 2 - - checkpoint_interval - 3600 + - - checkpoint_wal_threshold + - 1000000000000000000 - - coredump - false - - feedback_enabled diff --git a/test/xlog/checkpoint_threshold.result b/test/xlog/checkpoint_threshold.result new file mode 100644 index 0000000000000000000000000000000000000000..f1afec7c8a9cbde95314cbfff0f2486f32951d14 --- /dev/null +++ b/test/xlog/checkpoint_threshold.result @@ -0,0 +1,115 @@ +test_run = require('test_run').new() +--- +... +fiber = require('fiber') +--- +... +digest = require('digest') +--- +... +default_threshold = box.cfg.checkpoint_wal_threshold +--- +... +threshold = 10 * 1024 +--- +... +box.cfg{checkpoint_wal_threshold = threshold} +--- +... +s = box.schema.space.create('test') +--- +... +_ = s:create_index('pk') +--- +... +box.snapshot() +--- +- ok +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function put(size) + s:auto_increment{digest.urandom(size)} +end; +--- +... +function wait_checkpoint(signature) + signature = signature or box.info.signature + return test_run:wait_cond(function() + local checkpoints = box.info.gc().checkpoints + return signature == checkpoints[#checkpoints].signature + end, 10) +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- +-- Check that checkpointing is triggered automatically once +-- the size of WAL files written since the last checkpoint +-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082). +-- +for i = 1, 3 do put(threshold / 3) end +--- +... +wait_checkpoint() +--- +- true +... +for i = 1, 5 do put(threshold / 5) end +--- +... +wait_checkpoint() +--- +- true +... +-- +-- Check that WAL rows written while a checkpoint was created +-- are accounted as written after the checkpoint. +-- +box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true) +--- +- ok +... +-- This should trigger checkpointing, which will take quite +-- a while due to the injected delay. +for i = 1, 5 do put(threshold / 5) end +--- +... +fiber.sleep(0) +--- +... +-- Remember the future checkpoint signature. +signature = box.info.signature +--- +... +-- Insert some records while the checkpoint is created. +for i = 1, 4 do put(threshold / 5) end +--- +... +-- Disable the delay and wait for checkpointing to complete. +box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false) +--- +- ok +... +wait_checkpoint(signature) +--- +- true +... +-- Check that insertion of one more record triggers another +-- checkpoint, because it sums up with records written while +-- the previous checkpoint was created. +put(threshold / 5) +--- +... +wait_checkpoint() +--- +- true +... +box.cfg{checkpoint_wal_threshold = default_threshold} +--- +... diff --git a/test/xlog/checkpoint_threshold.test.lua b/test/xlog/checkpoint_threshold.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..cd55de099e27af9cf6950757c13a4267f5aa52a1 --- /dev/null +++ b/test/xlog/checkpoint_threshold.test.lua @@ -0,0 +1,63 @@ +test_run = require('test_run').new() +fiber = require('fiber') +digest = require('digest') + +default_threshold = box.cfg.checkpoint_wal_threshold +threshold = 10 * 1024 +box.cfg{checkpoint_wal_threshold = threshold} + +s = box.schema.space.create('test') +_ = s:create_index('pk') +box.snapshot() + +test_run:cmd("setopt delimiter ';'") +function put(size) + s:auto_increment{digest.urandom(size)} +end; +function wait_checkpoint(signature) + signature = signature or box.info.signature + return test_run:wait_cond(function() + local checkpoints = box.info.gc().checkpoints + return signature == checkpoints[#checkpoints].signature + end, 10) +end; +test_run:cmd("setopt delimiter ''"); + +-- +-- Check that checkpointing is triggered automatically once +-- the size of WAL files written since the last checkpoint +-- exceeds box.cfg.checkpoint_wal_threshold (gh-1082). +-- +for i = 1, 3 do put(threshold / 3) end +wait_checkpoint() +for i = 1, 5 do put(threshold / 5) end +wait_checkpoint() + +-- +-- Check that WAL rows written while a checkpoint was created +-- are accounted as written after the checkpoint. +-- +box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', true) + +-- This should trigger checkpointing, which will take quite +-- a while due to the injected delay. +for i = 1, 5 do put(threshold / 5) end +fiber.sleep(0) + +-- Remember the future checkpoint signature. +signature = box.info.signature + +-- Insert some records while the checkpoint is created. +for i = 1, 4 do put(threshold / 5) end + +-- Disable the delay and wait for checkpointing to complete. +box.error.injection.set('ERRINJ_SNAP_COMMIT_DELAY', false) +wait_checkpoint(signature) + +-- Check that insertion of one more record triggers another +-- checkpoint, because it sums up with records written while +-- the previous checkpoint was created. +put(threshold / 5) +wait_checkpoint() + +box.cfg{checkpoint_wal_threshold = default_threshold} diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini index 4f82295dfc061e9e417d4d2ae8a78d6b58c7a417..4043f3700105c44211a8261c2a56c9a802822133 100644 --- a/test/xlog/suite.ini +++ b/test/xlog/suite.ini @@ -4,7 +4,7 @@ description = tarantool write ahead log tests script = xlog.lua disabled = snap_io_rate.test.lua upgrade.test.lua valgrind_disabled = -release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua +release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua config = suite.cfg use_unix_sockets = True long_run = snap_io_rate.test.lua