From c3145e2e8338a5a7ce5d27b0d0813eb65b8fa7cb Mon Sep 17 00:00:00 2001
From: Dmitry Simonenko <pmwkaa@gmail.com>
Date: Mon, 23 Sep 2013 18:46:31 +0400
Subject: [PATCH] shared-arena: shared arena, closes #36 Allocate slab arena in
 shared memory to reduce pages splits during snapshotting. Adaptation of patch
 made by Yuri Nevinitsin @nevinitsin.

---
 include/salloc.h |   5 +-
 src/box/tuple.cc |   2 +-
 src/salloc.cc    | 182 ++++++++++++++++++++++++++++++++++++++---------
 src/tarantool.cc |  15 ++++
 4 files changed, 169 insertions(+), 35 deletions(-)

diff --git a/include/salloc.h b/include/salloc.h
index 4b7e5e97e4..0f8d3662f9 100644
--- a/include/salloc.h
+++ b/include/salloc.h
@@ -37,9 +37,12 @@ struct tbuf;
 bool salloc_init(size_t size, size_t minimal, double factor);
 void salloc_free(void);
 void *salloc(size_t size, const char *what);
-void sfree(void *ptr);
+void sfree(void *ptr, const char *what);
 void slab_validate();
 
+void salloc_reattach(void);
+void salloc_batch_mode(bool mode);
+
 /** Statistics on utilization of a single slab class. */
 struct slab_cache_stats {
 	int64_t item_size;
diff --git a/src/box/tuple.cc b/src/box/tuple.cc
index 0daec17367..581920fd7c 100644
--- a/src/box/tuple.cc
+++ b/src/box/tuple.cc
@@ -229,7 +229,7 @@ tuple_free(struct tuple *tuple)
 	say_debug("tuple_free(%p)", tuple);
 	assert(tuple->refs == 0);
 	char *ptr = (char *) tuple - tuple_format(tuple)->field_map_size;
-	sfree(ptr);
+	sfree(ptr, "tuple");
 }
 
 /**
diff --git a/src/salloc.cc b/src/salloc.cc
index 12257843c6..3ba155dd3a 100644
--- a/src/salloc.cc
+++ b/src/salloc.cc
@@ -60,6 +60,77 @@ static const size_t MAX_SLAB_ITEM = 1 << 20;
 /* updated in slab_classes_init, depends on salloc_init params */
 size_t MAX_SLAB_ITEM_COUNT;
 
+/*
+ *  slab delayed free queue.
+*/
+#define SLAB_Q_WATERMARK (512 * sizeof(void*))
+
+struct slab_q {
+	char *buf;
+	size_t bottom; /* advanced by batch free */
+	size_t top;
+	size_t size;   /* total buffer size */
+};
+
+static inline int
+slab_qinit(struct slab_q *q, size_t size) {
+	q->size = size;
+	q->bottom = 0;
+	q->top = 0;
+	q->buf = (char*)malloc(size);
+	return (q->buf == NULL ? -1 : 0);
+}
+
+static inline void
+slab_qfree(struct slab_q *q) {
+	if (q->buf) {
+		free(q->buf);
+		q->buf = NULL;
+	}
+}
+
+#ifndef unlikely
+# define unlikely __builtin_expect(!! (EXPR), 0)
+#endif
+
+static inline int
+slab_qpush(struct slab_q *q, void *ptr)
+{
+	/* reduce memory allocation and memmove
+	 * effect by reusing free pointers buffer space only after the
+	 * watermark frees reached. */
+	if (unlikely(q->bottom >= SLAB_Q_WATERMARK)) {
+		memmove(q->buf, q->buf + q->bottom, q->bottom);
+		q->top -= q->bottom;
+		q->bottom = 0;
+	}
+	if (unlikely((q->top + sizeof(void*)) > q->size)) {
+		size_t newsize = q->size * 2;
+		char *ptr = (char*)realloc((void*)q->buf, newsize);
+		if (unlikely(ptr == NULL))
+			return -1;
+		q->buf = ptr;
+		q->size = newsize;
+	}
+	memcpy(q->buf + q->top, (char*)&ptr, sizeof(ptr));
+	q->top += sizeof(void*);
+	return 0;
+}
+
+static inline int
+slab_qn(struct slab_q *q) {
+	return (q->top - q->bottom) / sizeof(void*);
+}
+
+static inline void*
+slab_qpop(struct slab_q *q) {
+	if (unlikely(q->bottom == q->top))
+		return NULL;
+	void *ret = *(void**)(q->buf + q->bottom);
+	q->bottom += sizeof(void*);
+	return ret;
+}
+
 struct slab_item {
 	struct slab_item *next;
 };
@@ -88,7 +159,9 @@ struct slab_cache {
 struct arena {
 	void *mmap_base;
 	size_t mmap_size;
-
+	bool delayed_free_mode;
+	size_t delayed_free_batch;
+	struct slab_q delayed_q;
 	void *base;
 	size_t size;
 	size_t used;
@@ -131,14 +204,22 @@ slab_caches_init(size_t minimal, double factor)
 static bool
 arena_init(struct arena *arena, size_t size)
 {
+	arena->delayed_free_mode = false;
+	arena->delayed_free_batch = 100;
+
+	int rc = slab_qinit(&arena->delayed_q, 4096);
+	if (rc == -1)
+		return false;
+
 	arena->used = 0;
 	arena->size = size - size % SLAB_SIZE;
 	arena->mmap_size = size - size % SLAB_SIZE + SLAB_SIZE;	/* spend SLAB_SIZE bytes on align :-( */
 
 	arena->mmap_base = mmap(NULL, arena->mmap_size,
-				PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
+				PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
 	if (arena->mmap_base == MAP_FAILED) {
 		say_syserror("mmap");
+		slab_qfree(&arena->delayed_q);
 		return false;
 	}
 
@@ -149,6 +230,14 @@ arena_init(struct arena *arena, size_t size)
 	return true;
 }
 
+void salloc_reattach(void) {
+	mprotect(arena.mmap_base, arena.mmap_size, PROT_READ);
+}
+
+void salloc_batch_mode(bool mode) {
+	arena.delayed_free_mode = mode;
+}
+
 static void *
 arena_alloc(struct arena *arena)
 {
@@ -182,7 +271,7 @@ salloc_free(void)
 {
 	if (arena.mmap_base != NULL)
 		munmap(arena.mmap_base, arena.mmap_size);
-
+	slab_qfree(&arena.delayed_q);
 	memset(&arena, 0, sizeof(struct arena));
 }
 
@@ -269,6 +358,60 @@ valid_item(struct slab *slab, void *item)
 }
 #endif
 
+static void
+sfree_do(void *ptr)
+{
+	struct slab *slab = slab_header(ptr);
+	struct slab_cache *cache = slab->cache;
+	struct slab_item *item = (struct slab_item *) ptr;
+
+	if (fully_formatted(slab) && slab->free == NULL)
+		TAILQ_INSERT_TAIL(&cache->free_slabs, slab, cache_free_link);
+
+	assert(valid_item(slab, item));
+	assert(slab->free == NULL || valid_item(slab, slab->free));
+
+	item->next = slab->free;
+	slab->free = item;
+	slab->used -= cache->item_size + sizeof(red_zone);
+	slab->items -= 1;
+
+	if (slab->items == 0) {
+		TAILQ_REMOVE(&cache->free_slabs, slab, cache_free_link);
+		TAILQ_REMOVE(&cache->slabs, slab, cache_link);
+		SLIST_INSERT_HEAD(&arena.free_slabs, slab, free_link);
+	}
+
+	VALGRIND_FREELIKE_BLOCK(item, sizeof(red_zone));
+}
+
+static void
+sfree_batch(void)
+{
+	ssize_t batch = arena.delayed_free_batch;
+	size_t n = slab_qn(&arena.delayed_q);
+	while (batch-- > 0 && n-- > 0) {
+		void *ptr = slab_qpop(&arena.delayed_q);
+		assert(ptr != NULL);
+		sfree_do(ptr);
+	}
+}
+
+void
+sfree(void *ptr, const char *what)
+{
+	if (ptr == NULL)
+		return;
+	if (arena.delayed_free_mode) {
+		if (slab_qpush(&arena.delayed_q, ptr) == -1)
+			tnt_raise(LoggedError, ER_MEMORY_ISSUE, arena.delayed_q.size * 2,
+				  "slab allocator", what);
+		return;
+	}
+	sfree_batch();
+	return sfree_do(ptr);
+}
+
 void *
 salloc(size_t size, const char *what)
 {
@@ -276,6 +419,9 @@ salloc(size_t size, const char *what)
 	struct slab *slab;
 	struct slab_item *item;
 
+	if (! arena.delayed_free_mode)
+		sfree_batch();
+
 	if ((cache = cache_for(size)) == NULL ||
 	    (slab = slab_of(cache)) == NULL) {
 
@@ -307,36 +453,6 @@ salloc(size_t size, const char *what)
 	return (void *)item;
 }
 
-void
-sfree(void *ptr)
-{
-	if (ptr == NULL)
-		return;
-	struct slab *slab = slab_header(ptr);
-	struct slab_cache *cache = slab->cache;
-	struct slab_item *item = (struct slab_item *) ptr;
-
-	if (fully_formatted(slab) && slab->free == NULL)
-		TAILQ_INSERT_TAIL(&cache->free_slabs, slab, cache_free_link);
-
-	assert(valid_item(slab, item));
-	assert(slab->free == NULL || valid_item(slab, slab->free));
-
-	item->next = slab->free;
-	slab->free = item;
-	slab->used -= cache->item_size + sizeof(red_zone);
-	slab->items -= 1;
-
-	if (slab->items == 0) {
-		TAILQ_REMOVE(&cache->free_slabs, slab, cache_free_link);
-		TAILQ_REMOVE(&cache->slabs, slab, cache_link);
-		SLIST_INSERT_HEAD(&arena.free_slabs, slab, free_link);
-	}
-
-	VALGRIND_FREELIKE_BLOCK(item, sizeof(red_zone));
-}
-
-
 size_t
 salloc_ptr_to_index(void *ptr)
 {
diff --git a/src/tarantool.cc b/src/tarantool.cc
index a418323e7f..22860a1dfe 100644
--- a/src/tarantool.cc
+++ b/src/tarantool.cc
@@ -316,12 +316,20 @@ tarantool_uptime(void)
 	return ev_now() - start_time;
 }
 
+void snapshot_exit(int code, void* arg) {
+	(void)arg;
+	fflush(NULL);
+	_exit(code);
+}
+
 int
 snapshot(void)
 {
 	if (snapshot_pid)
 		return EINPROGRESS;
 
+	salloc_batch_mode(true);
+
 	pid_t p = fork();
 	if (p < 0) {
 		say_syserror("fork");
@@ -329,11 +337,16 @@ snapshot(void)
 	}
 	if (p > 0) {
 		snapshot_pid = p;
+		say_warn("waiting for dumper %d", p);
 		int status = wait_for_child(p);
+		say_warn("dumper finished %d", p);
+		salloc_batch_mode(true);
 		snapshot_pid = 0;
 		return (WIFSIGNALED(status) ? EINTR : WEXITSTATUS(status));
 	}
 
+	salloc_reattach();
+
 	fiber_set_name(fiber, "dumper");
 	set_proc_title("dumper (%" PRIu32 ")", getppid());
 
@@ -342,6 +355,8 @@ snapshot(void)
 	 * parent stdio buffers at exit().
 	 */
 	close_all_xcpt(1, sayfd);
+
+	on_exit(snapshot_exit, NULL);
 	snapshot_save(recovery_state, box_snapshot);
 
 	exit(EXIT_SUCCESS);
-- 
GitLab