diff --git a/src/box/box.cc b/src/box/box.cc index 7d6dfc8f63895f595d1593157983387a7b97ed69..ba0af95e4a6d86745c55952e748c01ffc8186ee0 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1406,7 +1406,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Register the replica with the garbage collector. */ struct gc_consumer *gc = gc_consumer_register( tt_sprintf("replica %s", tt_uuid_str(&instance_uuid)), - vclock_sum(&start_vclock)); + vclock_sum(&start_vclock), GC_CONSUMER_WAL); if (gc == NULL) diag_raise(); auto gc_guard = make_scoped_guard([=]{ @@ -2082,7 +2082,8 @@ box_backup_start(int checkpoint_idx, box_backup_cb cb, void *cb_arg) return -1; } } while (checkpoint_idx-- > 0); - backup_gc = gc_consumer_register("backup", vclock_sum(vclock)); + backup_gc = gc_consumer_register("backup", vclock_sum(vclock), + GC_CONSUMER_ALL); if (backup_gc == NULL) return -1; int rc = engine_backup(vclock, cb, cb_arg); diff --git a/src/box/gc.c b/src/box/gc.c index 12e68f3dcd18749ed710f19c1568e0d3e5b8b6e8..6a05b298306bce285a07cb3b1c0c388f7f97f480 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -61,6 +61,10 @@ struct gc_consumer { char *name; /** The vclock signature tracked by this consumer. */ int64_t signature; + /** Consumer type, indicating that consumer only consumes + * WAL files, or both - SNAP and WAL. + */ + enum gc_consumer_type type; }; typedef rb_tree(struct gc_consumer) gc_tree_t; @@ -69,8 +73,10 @@ typedef rb_tree(struct gc_consumer) gc_tree_t; struct gc_state { /** Number of checkpoints to maintain. */ int checkpoint_count; - /** Max signature garbage collection has been called for. */ - int64_t signature; + /** Max signature WAL garbage collection has been called for. */ + int64_t wal_signature; + /** Max signature checkpoint garbage collection has been called for. */ + int64_t checkpoint_signature; /** Registered consumers, linked by gc_consumer::node. */ gc_tree_t consumers; /** @@ -104,7 +110,8 @@ rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t, /** Allocate a consumer object. */ static struct gc_consumer * -gc_consumer_new(const char *name, int64_t signature) +gc_consumer_new(const char *name, int64_t signature, + enum gc_consumer_type type) { struct gc_consumer *consumer = calloc(1, sizeof(*consumer)); if (consumer == NULL) { @@ -120,6 +127,7 @@ gc_consumer_new(const char *name, int64_t signature) return NULL; } consumer->signature = signature; + consumer->type = type; return consumer; } @@ -135,7 +143,8 @@ gc_consumer_delete(struct gc_consumer *consumer) void gc_init(void) { - gc.signature = -1; + gc.wal_signature = -1; + gc.checkpoint_signature = -1; gc_tree_new(&gc.consumers); latch_create(&gc.latch); } @@ -155,21 +164,34 @@ gc_free(void) latch_destroy(&gc.latch); } +/** Find the consumer that uses the oldest checkpoint. */ +struct gc_consumer * +gc_tree_first_checkpoint(gc_tree_t *consumers) +{ + struct gc_consumer *consumer = gc_tree_first(consumers); + while (consumer != NULL && consumer->type == GC_CONSUMER_WAL) + consumer = gc_tree_next(consumers, consumer); + return consumer; +} + void gc_run(void) { int checkpoint_count = gc.checkpoint_count; assert(checkpoint_count > 0); - /* Look up the consumer that uses the oldest snapshot. */ + /* Look up the consumer that uses the oldest WAL. */ struct gc_consumer *leftmost = gc_tree_first(&gc.consumers); + /* Look up the consumer that uses the oldest checkpoint. */ + struct gc_consumer *leftmost_checkpoint = + gc_tree_first_checkpoint(&gc.consumers); /* * Find the oldest checkpoint that must be preserved. - * We have to maintain @checkpoint_count oldest snapshots, - * plus we can't remove snapshots that are still in use. + * We have to maintain @checkpoint_count oldest checkpoints, + * plus we can't remove checkpoints that are still in use. */ - int64_t gc_signature = -1; + int64_t gc_checkpoint_signature = -1; struct checkpoint_iterator checkpoints; checkpoint_iterator_init(&checkpoints); @@ -178,17 +200,20 @@ gc_run(void) while ((vclock = checkpoint_iterator_prev(&checkpoints)) != NULL) { if (--checkpoint_count > 0) continue; - if (leftmost != NULL && - leftmost->signature < vclock_sum(vclock)) + if (leftmost_checkpoint != NULL && + leftmost_checkpoint->signature < vclock_sum(vclock)) continue; - gc_signature = vclock_sum(vclock); + gc_checkpoint_signature = vclock_sum(vclock); break; } - if (gc_signature <= gc.signature) - return; /* nothing to do */ + int64_t gc_wal_signature = MIN(gc_checkpoint_signature, + leftmost != NULL ? + leftmost->signature : INT64_MAX); - gc.signature = gc_signature; + if (gc_wal_signature <= gc.wal_signature && + gc_checkpoint_signature <= gc.checkpoint_signature) + return; /* nothing to do */ /* * Engine callbacks may sleep, because they use coio for @@ -204,8 +229,17 @@ gc_run(void) * collection for memtx snapshots first and abort if it * fails - see comment to memtx_engine_collect_garbage(). */ - if (engine_collect_garbage(gc_signature) == 0) - wal_collect_garbage(gc_signature); + int rc = 0; + + if (gc_checkpoint_signature > gc.checkpoint_signature) { + gc.checkpoint_signature = gc_checkpoint_signature; + rc = engine_collect_garbage(gc_checkpoint_signature); + } + if (gc_wal_signature > gc.wal_signature) { + gc.wal_signature = gc_wal_signature; + if (rc == 0) + wal_collect_garbage(gc_wal_signature); + } latch_unlock(&gc.latch); } @@ -217,9 +251,11 @@ gc_set_checkpoint_count(int checkpoint_count) } struct gc_consumer * -gc_consumer_register(const char *name, int64_t signature) +gc_consumer_register(const char *name, int64_t signature, + enum gc_consumer_type type) { - struct gc_consumer *consumer = gc_consumer_new(name, signature); + struct gc_consumer *consumer = gc_consumer_new(name, signature, + type); if (consumer != NULL) gc_tree_insert(&gc.consumers, consumer); return consumer; diff --git a/src/box/gc.h b/src/box/gc.h index 634ce6d38421aae491fd6a7ebd5eef9841fec956..6a890b7b7d5f3d4aaef2602e06011cdb32392b29 100644 --- a/src/box/gc.h +++ b/src/box/gc.h @@ -33,6 +33,7 @@ #include <stddef.h> #include <stdint.h> +#include <stdbool.h> #if defined(__cplusplus) extern "C" { @@ -40,6 +41,13 @@ extern "C" { struct gc_consumer; +/** Consumer type: WAL consumer, or SNAP */ +enum gc_consumer_type { + GC_CONSUMER_WAL = 1, + GC_CONSUMER_SNAP = 2, + GC_CONSUMER_ALL = 3, +}; + /** * Initialize the garbage collection state. */ @@ -74,12 +82,15 @@ gc_set_checkpoint_count(int checkpoint_count); * @signature until the consumer is unregistered or advanced. * @name is a human-readable name of the consumer, it will * be used for reporting the consumer to the user. + * @type consumer type, reporting whether consumer only depends + * on WAL files. * * Returns a pointer to the new consumer object or NULL on * memory allocation failure. */ struct gc_consumer * -gc_consumer_register(const char *name, int64_t signature); +gc_consumer_register(const char *name, int64_t signature, + enum gc_consumer_type type); /** * Unregister a consumer and invoke garbage collection diff --git a/src/box/relay.cc b/src/box/relay.cc index a25cc540b4c44d55f6a8b09f2fd258c928d31116..c91e5aed37534cd700a9b8d4e85b6f55f1f5ddf7 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -578,7 +578,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, if (replica->gc == NULL) { replica->gc = gc_consumer_register( tt_sprintf("replica %s", tt_uuid_str(&replica->uuid)), - vclock_sum(replica_clock)); + vclock_sum(replica_clock), GC_CONSUMER_WAL); if (replica->gc == NULL) diag_raise(); } diff --git a/test/replication/gc.result b/test/replication/gc.result index 7d6644ae6cdba6fe893fc4ed33b55b4e870e741e..a4a5ae37118274d8dc1957fc2b29a22b3b02fc7c 100644 --- a/test/replication/gc.result +++ b/test/replication/gc.result @@ -1,3 +1,6 @@ +fio = require 'fio' +--- +... test_run = require('test_run').new() --- ... @@ -115,6 +118,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- Make sure the replica will receive data it is subscribed -- to long enough for us to invoke garbage collection. box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) @@ -122,16 +129,34 @@ box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) - ok ... -- Send more data to the replica. -for i = 1, 100 do s:auto_increment{} end +-- Need to do 2 snapshots here, otherwise the replica would +-- only require 1 xlog and that case would be +-- indistinguishable from wrong operation. +for i = 1, 50 do s:auto_increment{} end +--- +... +box.snapshot() +--- +- ok +... +for i = 1, 50 do s:auto_increment{} end --- ... +box.snapshot() +--- +- ok +... -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() --- - ok ... -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +--- +- true +... +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- - true ... @@ -166,6 +191,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') +--- +- true +... -- -- Check that the master doesn't delete xlog files sent to the -- replica until it receives a confirmation that the data has @@ -199,9 +228,13 @@ fiber.sleep(0.1) -- wait for master to relay data --- ... -- Garbage collection must not delete the old xlog file --- (and the corresponding snapshot), because it is still --- needed by the replica. -#box.info.gc().checkpoints == 2 or box.info.gc() +-- because it is still needed by the replica, but remove +-- the old snapshot. +#box.info.gc().checkpoints == 1 or box.info.gc() +--- +- true +... +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- - true ... @@ -265,6 +298,10 @@ wait_gc(1) --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- Stop the replica. test_run:cmd("stop server replica") --- @@ -274,8 +311,18 @@ test_run:cmd("cleanup server replica") --- - true ... --- Invoke garbage collection. Check that it doesn't remove --- the checkpoint last used by the replica. +-- Invoke garbage collection. Check that it removes the old +-- checkpoint, but keeps the xlog last used by the replica. +-- once again, need 2 snapshots because after 1 snapshot +-- with no insertions after it the replica would need only +-- 1 xlog, which is stored anyways. +_ = s:auto_increment{} +--- +... +box.snapshot() +--- +- ok +... _ = s:auto_increment{} --- ... @@ -283,11 +330,15 @@ box.snapshot() --- - ok ... -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() --- - true ... --- The checkpoint should only be deleted after the replica +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') +--- +- true +... +-- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() --- @@ -296,6 +347,10 @@ test_run:cleanup_cluster() --- - true ... +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') +--- +- true +... -- -- Test that concurrent invocation of the garbage collector works fine. -- diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua index 3a680075eb9427d2b7c5f37324049c42091baab5..1a2cccc6028248056d4eb835c03e8ef885419b25 100644 --- a/test/replication/gc.test.lua +++ b/test/replication/gc.test.lua @@ -1,3 +1,4 @@ +fio = require 'fio' test_run = require('test_run').new() engine = test_run:get_cfg('engine') replica_set = require('fast_replica') @@ -60,18 +61,25 @@ test_run:cmd("switch default") -- the replica released the corresponding checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- Make sure the replica will receive data it is subscribed -- to long enough for us to invoke garbage collection. box.error.injection.set("ERRINJ_RELAY_TIMEOUT", 0.05) -- Send more data to the replica. -for i = 1, 100 do s:auto_increment{} end +-- Need to do 2 snapshots here, otherwise the replica would +-- only require 1 xlog and that case would be +-- indistinguishable from wrong operation. +for i = 1, 50 do s:auto_increment{} end +box.snapshot() +for i = 1, 50 do s:auto_increment{} end +box.snapshot() -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') -- Remove the timeout injection so that the replica catches -- up quickly. @@ -87,7 +95,7 @@ test_run:cmd("switch default") -- from the old checkpoint. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 0 or fio.listdir('./master') -- -- Check that the master doesn't delete xlog files sent to the -- replica until it receives a confirmation that the data has @@ -103,9 +111,10 @@ box.snapshot() -- rotate xlog for i = 1, 5 do s:auto_increment{} end fiber.sleep(0.1) -- wait for master to relay data -- Garbage collection must not delete the old xlog file --- (and the corresponding snapshot), because it is still --- needed by the replica. -#box.info.gc().checkpoints == 2 or box.info.gc() +-- because it is still needed by the replica, but remove +-- the old snapshot. +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') test_run:cmd("switch replica") -- Unblock the replica and make it fail to apply a row. box.info.replication[1].upstream.message == nil @@ -125,22 +134,28 @@ test_run:cmd("switch default") -- Now it's safe to drop the old xlog. wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- Stop the replica. test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") --- Invoke garbage collection. Check that it doesn't remove --- the checkpoint last used by the replica. +-- Invoke garbage collection. Check that it removes the old +-- checkpoint, but keeps the xlog last used by the replica. +-- once again, need 2 snapshots because after 1 snapshot +-- with no insertions after it the replica would need only +-- 1 xlog, which is stored anyways. +_ = s:auto_increment{} +box.snapshot() _ = s:auto_increment{} box.snapshot() -#box.info.gc().checkpoints == 2 or box.info.gc() +#box.info.gc().checkpoints == 1 or box.info.gc() +#fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') --- The checkpoint should only be deleted after the replica +-- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() #box.info.gc().checkpoints == 1 or box.info.gc() - +#fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- -- Test that concurrent invocation of the garbage collector works fine. --