From 2dd126fbc479a4782516ded45bf89b8bcf536cda Mon Sep 17 00:00:00 2001 From: mechanik20051988 <mechanik20.05.1988@gmail.com> Date: Thu, 25 Mar 2021 10:04:07 +0300 Subject: [PATCH] memtx: move delayed tuples deletion from small allocator to memtx Delayed free mode in small allocator is only used to free up tuple memory, during snapshot creation. This is not directly related to the small allocator itself, so moved this code to tarantool in memtx_engine.c, where tuples memory allocation/deallocation occurs. --- src/box/memtx_engine.c | 75 +++++++++++++++++--- src/box/memtx_engine.h | 18 +++++ src/lib/small | 2 +- test/box/moved_delayed_free_mode.result | 84 +++++++++++++++++++++++ test/box/moved_delayed_free_mode.test.lua | 49 +++++++++++++ 5 files changed, 217 insertions(+), 11 deletions(-) create mode 100644 test/box/moved_delayed_free_mode.result create mode 100644 test/box/moved_delayed_free_mode.test.lua diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 97561e719a..0702cf31a4 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -79,6 +79,59 @@ enum { MAX_TUPLE_SIZE = 1 * 1024 * 1024, }; +static inline void +memtx_mem_free(struct memtx_engine *memtx, void *ptr, size_t size); + +static inline void +memtx_tuple_free(struct memtx_engine *memtx, void *item) +{ + struct memtx_tuple *memtx_tuple = (struct memtx_tuple *)item; + struct tuple *tuple = &memtx_tuple->base; + size_t total = tuple_size(tuple) + offsetof(struct memtx_tuple, base); + memtx_mem_free(memtx, memtx_tuple, total); +} + +static inline void +memtx_free_all_garbage_tuples(struct memtx_engine *memtx) +{ + void *item; + while ((item = lifo_pop(&memtx->delayed))) + memtx_tuple_free(memtx, item); +} + +static inline void +memtx_collect_garbage_tuples(struct memtx_engine *memtx) +{ + if (memtx->free_mode != MEMTX_ENGINE_COLLECT_GARBAGE) + return; + + if (!lifo_is_empty(&memtx->delayed)) { + const int BATCH = 100; + for (int i = 0; i < BATCH; i++) { + void *item = lifo_pop(&memtx->delayed); + if (item == NULL) + break; + memtx_tuple_free(memtx, item); + } + } else { + /* Finish garbage collection and switch to regular mode */ + memtx->free_mode = MEMTX_ENGINE_FREE; + } +} + +static inline void * +memtx_mem_alloc(struct memtx_engine *memtx, size_t size) +{ + memtx_collect_garbage_tuples(memtx); + return smalloc(&memtx->alloc, size); +} + +static inline void +memtx_mem_free(struct memtx_engine *memtx, void *ptr, size_t size) +{ + return smfree(&memtx->alloc, ptr, size); +} + static int memtx_end_build_primary_key(struct space *space, void *param) { @@ -142,6 +195,7 @@ memtx_engine_shutdown(struct engine *engine) mempool_destroy(&memtx->rtree_iterator_pool); mempool_destroy(&memtx->index_extent_pool); slab_cache_destroy(&memtx->index_slab_cache); + memtx_free_all_garbage_tuples(memtx); small_alloc_destroy(&memtx->alloc); slab_cache_destroy(&memtx->slab_cache); tuple_arena_destroy(&memtx->arena); @@ -1177,6 +1231,8 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery, &actual_alloc_factor); say_info("Actual slab_alloc_factor calculated on the basis of desired " "slab_alloc_factor = %f", actual_alloc_factor); + lifo_init(&memtx->delayed); + memtx->free_mode = MEMTX_ENGINE_FREE; /* Initialize index extent allocator. */ slab_cache_create(&memtx->index_slab_cache, &memtx->arena); @@ -1241,7 +1297,7 @@ memtx_enter_delayed_free_mode(struct memtx_engine *memtx) { memtx->snapshot_version++; if (memtx->delayed_free_mode++ == 0) - small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, true); + memtx->free_mode = MEMTX_ENGINE_DELAYED_FREE; } void @@ -1249,7 +1305,7 @@ memtx_leave_delayed_free_mode(struct memtx_engine *memtx) { assert(memtx->delayed_free_mode > 0); if (--memtx->delayed_free_mode == 0) - small_alloc_setopt(&memtx->alloc, SMALL_DELAYED_FREE_MODE, false); + memtx->free_mode = MEMTX_ENGINE_COLLECT_GARBAGE; } struct tuple * @@ -1294,7 +1350,7 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end) } struct memtx_tuple *memtx_tuple; - while ((memtx_tuple = smalloc(&memtx->alloc, total)) == NULL) { + while ((memtx_tuple = memtx_mem_alloc(memtx, total)) == NULL) { bool stop; memtx_engine_run_gc(memtx, &stop); if (stop) @@ -1326,13 +1382,12 @@ memtx_tuple_delete(struct tuple_format *format, struct tuple *tuple) assert(tuple_is_unreferenced(tuple)); struct memtx_tuple *memtx_tuple = container_of(tuple, struct memtx_tuple, base); - size_t total = tuple_size(tuple) + offsetof(struct memtx_tuple, base); - if (memtx->alloc.free_mode != SMALL_DELAYED_FREE || + if (memtx->free_mode != MEMTX_ENGINE_DELAYED_FREE || memtx_tuple->version == memtx->snapshot_version || format->is_temporary) - smfree(&memtx->alloc, memtx_tuple, total); + memtx_tuple_free(memtx, memtx_tuple); else - smfree_delayed(&memtx->alloc, memtx_tuple, total); + lifo_push(&memtx->delayed, (void *)memtx_tuple); tuple_format_unref(format); } @@ -1344,7 +1399,7 @@ metmx_tuple_chunk_delete(struct tuple_format *format, const char *data) container_of((const char (*)[0])data, struct tuple_chunk, data); uint32_t sz = tuple_chunk_sz(tuple_chunk->data_sz); - smfree(&memtx->alloc, tuple_chunk, sz); + memtx_mem_free(memtx, tuple_chunk, sz); } const char * @@ -1354,9 +1409,9 @@ memtx_tuple_chunk_new(struct tuple_format *format, struct tuple *tuple, struct memtx_engine *memtx = (struct memtx_engine *)format->engine; uint32_t sz = tuple_chunk_sz(data_sz); struct tuple_chunk *tuple_chunk = - (struct tuple_chunk *) smalloc(&memtx->alloc, sz); + (struct tuple_chunk *) memtx_mem_alloc(memtx, sz); if (tuple == NULL) { - diag_set(OutOfMemory, sz, "smalloc", "tuple"); + diag_set(OutOfMemory, sz, "memtx_mem_alloc", "tuple"); return NULL; } tuple_chunk->data_sz = data_sz; diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h index 8bf35b5a4b..4f3f86a635 100644 --- a/src/box/memtx_engine.h +++ b/src/box/memtx_engine.h @@ -50,6 +50,18 @@ struct fiber; struct tuple; struct tuple_format; +/** + * Free mode, determines a strategy for freeing up memory + */ +enum memtx_engine_free_mode { + /** Free objects immediately. */ + MEMTX_ENGINE_FREE, + /** Collect garbage after delayed free. */ + MEMTX_ENGINE_COLLECT_GARBAGE, + /** Postpone deletion of objects. */ + MEMTX_ENGINE_DELAYED_FREE, +}; + /** * The state of memtx recovery process. * There is a global state of the entire engine state of each @@ -178,6 +190,12 @@ struct memtx_engine { * memtx_gc_task::link. */ struct stailq gc_queue; + /** + * Free mode, determines a strategy for freeing up memory + */ + enum memtx_engine_free_mode free_mode; + /** List of tuples for delayed free. */ + struct lifo delayed; }; struct memtx_gc_task; diff --git a/src/lib/small b/src/lib/small index e610d491b3..8c3a4be199 160000 --- a/src/lib/small +++ b/src/lib/small @@ -1 +1 @@ -Subproject commit e610d491b30676d1d05caa106dd865123b2ed35e +Subproject commit 8c3a4be19932458a16618eb86c906928c984b271 diff --git a/test/box/moved_delayed_free_mode.result b/test/box/moved_delayed_free_mode.result new file mode 100644 index 0000000000..9d0f6730df --- /dev/null +++ b/test/box/moved_delayed_free_mode.result @@ -0,0 +1,84 @@ +-- test-run result file version 2 +env = require('test_run') + | --- + | ... +fiber = require('fiber') + | --- + | ... +test_run = env.new() + | --- + | ... + +objcount = 10000 + | --- + | ... +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +function test(op, count, space) + if op == 1 then + for key = 1, count do + space:insert({key, key + 1000}) + end + elseif op == 2 then + for key = 1, count do + space:replace({key, key + 5000}) + end + elseif op == 3 then + for key = 1, count do + space:upsert({key, key + 5000}, {{'=', 2, key + 10000}}) + end + elseif op == 4 then + for key = 1, count do + space:delete({key}) + end + else + assert(0) + end +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... + +space = box.schema.space.create('test') + | --- + | ... +space:format({ {name = 'id', type = 'unsigned'}, \ + {name = 'year', type = 'unsigned'} }) + | --- + | ... +_ = space:create_index('primary', { parts = {'id'} }) + | --- + | ... +channel = fiber.channel(1) + | --- + | ... + +-- Try to insert/replace/upsert/delete tuples in fiber +-- in parallel with the snapshot creation. +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +for op = 1, 4 do + _ = fiber.create(function() + test(op, objcount, space) + channel:put(true) + end) + box.snapshot() + assert(channel:get() == true) +end; + | --- + | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... + +space:drop() + | --- + | ... diff --git a/test/box/moved_delayed_free_mode.test.lua b/test/box/moved_delayed_free_mode.test.lua new file mode 100644 index 0000000000..4fcd18d7af --- /dev/null +++ b/test/box/moved_delayed_free_mode.test.lua @@ -0,0 +1,49 @@ +env = require('test_run') +fiber = require('fiber') +test_run = env.new() + +objcount = 10000 +test_run:cmd("setopt delimiter ';'") +function test(op, count, space) + if op == 1 then + for key = 1, count do + space:insert({key, key + 1000}) + end + elseif op == 2 then + for key = 1, count do + space:replace({key, key + 5000}) + end + elseif op == 3 then + for key = 1, count do + space:upsert({key, key + 5000}, {{'=', 2, key + 10000}}) + end + elseif op == 4 then + for key = 1, count do + space:delete({key}) + end + else + assert(0) + end +end; +test_run:cmd("setopt delimiter ''"); + +space = box.schema.space.create('test') +space:format({ {name = 'id', type = 'unsigned'}, \ + {name = 'year', type = 'unsigned'} }) +_ = space:create_index('primary', { parts = {'id'} }) +channel = fiber.channel(1) + +-- Try to insert/replace/upsert/delete tuples in fiber +-- in parallel with the snapshot creation. +test_run:cmd("setopt delimiter ';'") +for op = 1, 4 do + _ = fiber.create(function() + test(op, objcount, space) + channel:put(true) + end) + box.snapshot() + assert(channel:get() == true) +end; +test_run:cmd("setopt delimiter ''"); + +space:drop() -- GitLab