Skip to content
Snippets Groups Projects
Commit 246a89ef authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

gh-2532: use region allocator for struct vy_log_record

parent 08590def
No related branches found
No related tags found
No related merge requests found
......@@ -136,8 +136,8 @@ struct vy_log {
* Used by vy_log_next_id().
*/
int64_t next_id;
/** Mempool for struct vy_log_record. */
struct mempool record_pool;
/** A region of struct vy_log_record entries. */
struct region pool;
/**
* Records awaiting to be written to disk.
* Linked by vy_log_record::in_tx;
......@@ -145,6 +145,8 @@ struct vy_log {
struct stailq tx;
/** Number of entries in the @tx list. */
int tx_size;
/** Start of the current transaction in the pool, for rollback */
size_t tx_svp;
/** First record of the current transaction. */
struct stailq_entry *tx_begin;
/**
......@@ -664,12 +666,14 @@ vy_log_record_decode(struct vy_log_record *record,
* are duplicated as well.
*/
static struct vy_log_record *
vy_log_record_dup(struct mempool *pool, const struct vy_log_record *src)
vy_log_record_dup(struct region *pool, const struct vy_log_record *src)
{
struct vy_log_record *dst = mempool_alloc(pool);
size_t used = region_used(pool);
struct vy_log_record *dst = region_alloc(pool, sizeof(*dst));
if (dst == NULL) {
diag_set(OutOfMemory, sizeof(*dst),
"malloc", "struct vy_log_record");
"region", "struct vy_log_record");
goto err;
}
*dst = *src;
......@@ -677,11 +681,11 @@ vy_log_record_dup(struct mempool *pool, const struct vy_log_record *src)
const char *data = src->begin;
mp_next(&data);
size_t size = data - src->begin;
dst->begin = malloc(size);
dst->begin = region_alloc(pool, size);
if (dst->begin == NULL) {
diag_set(OutOfMemory, size, "malloc",
"struct vy_log_record");
goto err_begin;
diag_set(OutOfMemory, size, "region",
"vy_log_record::begin");
goto err;
}
memcpy((char *)dst->begin, src->begin, size);
}
......@@ -689,50 +693,34 @@ vy_log_record_dup(struct mempool *pool, const struct vy_log_record *src)
const char *data = src->end;
mp_next(&data);
size_t size = data - src->end;
dst->end = malloc(size);
dst->end = region_alloc(pool, size);
if (dst->end == NULL) {
diag_set(OutOfMemory, size, "malloc",
diag_set(OutOfMemory, size, "region",
"struct vy_log_record");
goto err_end;
goto err;
}
memcpy((char *)dst->end, src->end, size);
}
if (src->key_def != NULL) {
dst->key_def = key_def_dup(src->key_def);
size_t size = key_def_sizeof(src->key_def->part_count);
dst->key_def = region_alloc(pool, size);
if (dst->key_def == NULL)
goto err_key_def;
goto err;
memcpy((char*) dst->key_def, src->key_def, size);
}
return dst;
err_key_def:
free((char *)dst->end);
err_end:
free((char *)dst->begin);
err_begin:
mempool_free(pool, dst);
err:
region_truncate(pool, used);
return NULL;
}
/**
* Free a log record and all objects it refers to.
*/
static void
vy_log_record_free(struct mempool *pool, struct vy_log_record *record)
{
free((char *)record->begin);
free((char *)record->end);
free((struct key_def *)record->key_def);
mempool_free(pool, record);
}
void
vy_log_init(const char *dir)
{
xdir_create(&vy_log.dir, dir, VYLOG, &INSTANCE_UUID);
latch_create(&vy_log.latch);
mempool_create(&vy_log.record_pool, cord_slab_cache(),
sizeof(struct vy_log_record));
region_create(&vy_log.pool, cord_slab_cache());
stailq_create(&vy_log.tx);
diag_create(&vy_log.tx_diag);
wal_init_vy_log();
......@@ -790,11 +778,10 @@ vy_log_flush(void)
return -1;
/* Success. Free flushed records. */
struct vy_log_record *next;
stailq_foreach_entry_safe(record, next, &vy_log.tx, in_tx)
vy_log_record_free(&vy_log.record_pool, record);
region_reset(&vy_log.pool);
stailq_create(&vy_log.tx);
vy_log.tx_size = 0;
vy_log.tx_svp = 0;
return 0;
}
......@@ -803,7 +790,7 @@ vy_log_free(void)
{
xdir_destroy(&vy_log.dir);
latch_destroy(&vy_log.latch);
mempool_destroy(&vy_log.record_pool);
region_destroy(&vy_log.pool);
diag_destroy(&vy_log.tx_diag);
}
......@@ -1149,7 +1136,6 @@ vy_log_tx_begin(void)
static int
vy_log_tx_do_commit(bool no_discard)
{
struct vy_log_record *record, *next;
struct stailq rollback;
int rc = 0;
......@@ -1196,11 +1182,10 @@ vy_log_tx_do_commit(bool no_discard)
rollback:
stailq_create(&rollback);
stailq_splice(&vy_log.tx, vy_log.tx_begin, &rollback);
stailq_foreach_entry_safe(record, next, &rollback, in_tx) {
assert(vy_log.tx_size > 0);
vy_log.tx_size--;
vy_log_record_free(&vy_log.record_pool, record);
}
region_truncate(&vy_log.pool, vy_log.tx_svp);
vy_log.tx_size = 0;
vy_log.tx_svp = 0;
vy_log.tx_begin = NULL;
goto out;
}
......@@ -1223,8 +1208,10 @@ vy_log_write(const struct vy_log_record *record)
say_debug("%s: %s", __func__, vy_log_record_str(record));
struct vy_log_record *tx_record;
tx_record = vy_log_record_dup(&vy_log.record_pool, record);
size_t svp = region_used(&vy_log.pool);
struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.pool,
record);
if (tx_record == NULL) {
diag_move(diag_get(), &vy_log.tx_diag);
vy_log.tx_failed = true;
......@@ -1233,8 +1220,10 @@ vy_log_write(const struct vy_log_record *record)
stailq_add_tail_entry(&vy_log.tx, tx_record, in_tx);
vy_log.tx_size++;
if (vy_log.tx_begin == NULL)
if (vy_log.tx_begin == NULL) {
vy_log.tx_begin = &tx_record->in_tx;
vy_log.tx_svp = svp;
}
}
/** Lookup a vinyl index in vy_recovery::index_hash map. */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment