-
Vladislav Shpilevoy authored
Some error injections use usleep() to put the current thread in sleep. For example, to simulate, that WAL thread is slow. A few of them allowed to pass custom number of microseconds to usleep, in form: usleep(injection->dvalue * 1000000); Assuming, that dvalue is a number of seconds. But usleep argument is uint32_t, at least on Mac (it is useconds_t, whose size is 4). It means, that too big dvalue easily overflows it. The patch makes it use nanosleep(), in a new wrapper: thread_sleep(). It takes a double value, and does not truncate it to 4 bytes. The overflow was the case for ERRINJ_VY_READ_PAGE_TIMEOUT = 9000 in test/vinyl/errinj_vylog.test.lua. And ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT = 9000 in test/vinyl/errinj.test.lua. Part of #4609
Vladislav Shpilevoy authoredSome error injections use usleep() to put the current thread in sleep. For example, to simulate, that WAL thread is slow. A few of them allowed to pass custom number of microseconds to usleep, in form: usleep(injection->dvalue * 1000000); Assuming, that dvalue is a number of seconds. But usleep argument is uint32_t, at least on Mac (it is useconds_t, whose size is 4). It means, that too big dvalue easily overflows it. The patch makes it use nanosleep(), in a new wrapper: thread_sleep(). It takes a double value, and does not truncate it to 4 bytes. The overflow was the case for ERRINJ_VY_READ_PAGE_TIMEOUT = 9000 in test/vinyl/errinj_vylog.test.lua. And ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT = 9000 in test/vinyl/errinj.test.lua. Part of #4609
vy_scheduler.c 57.60 KiB
/*
* Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "vy_scheduler.h"
#include <assert.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <small/rlist.h>
#include <tarantool_ev.h>
#include "diag.h"
#include "errcode.h"
#include "errinj.h"
#include "fiber.h"
#include "fiber_cond.h"
#include "cbus.h"
#include "salad/stailq.h"
#include "say.h"
#include "txn.h"
#include "space.h"
#include "schema.h"
#include "xrow.h"
#include "vy_lsm.h"
#include "vy_log.h"
#include "vy_mem.h"
#include "vy_range.h"
#include "vy_run.h"
#include "vy_write_iterator.h"
#include "trivia/util.h"
/* Min and max values for vy_scheduler::timeout. */
#define VY_SCHEDULER_TIMEOUT_MIN 1
#define VY_SCHEDULER_TIMEOUT_MAX 60
static int vy_worker_f(va_list);
static int vy_scheduler_f(va_list);
static void vy_task_execute_f(struct cmsg *);
static void vy_task_complete_f(struct cmsg *);
static void vy_deferred_delete_batch_process_f(struct cmsg *);
static void vy_deferred_delete_batch_free_f(struct cmsg *);
static const struct cmsg_hop vy_task_execute_route[] = {
{ vy_task_execute_f, NULL },
};
static const struct cmsg_hop vy_task_complete_route[] = {
{ vy_task_complete_f, NULL },
};
struct vy_task;
/** Vinyl worker thread. */
struct vy_worker {
struct cord cord;
/** Pipe from tx to the worker thread. */
struct cpipe worker_pipe;
/** Pipe from the worker thread to tx. */
struct cpipe tx_pipe;
/** Pool this worker was allocated from. */
struct vy_worker_pool *pool;
/**
* Task that is currently being executed by the worker
* or NULL if the worker is idle.
*/
struct vy_task *task;
/** Link in vy_worker_pool::idle_workers. */
struct stailq_entry in_idle;
/** Route for sending deferred DELETEs back to tx. */
struct cmsg_hop deferred_delete_route[2];
};
/** Max number of statements in a batch of deferred DELETEs. */
enum { VY_DEFERRED_DELETE_BATCH_MAX = 100 };
/** Deferred DELETE statement. */
struct vy_deferred_delete_stmt {
/** Overwritten tuple. */
struct tuple *old_stmt;
/** Statement that overwrote @old_stmt. */
struct tuple *new_stmt;
};
/**
* Batch of deferred DELETE statements generated during
* a primary index compaction.
*/
struct vy_deferred_delete_batch {
/** CBus messages for sending the batch to tx. */
struct cmsg cmsg;
/** Task that generated this batch. */
struct vy_task *task;
/** Set if the tx thread failed to process the batch. */
bool is_failed;
/** In case of failure the error is stored here. */
struct diag diag;
/** Number of elements actually stored in @stmt array. */
int count;
/** Array of deferred DELETE statements. */
struct vy_deferred_delete_stmt stmt[VY_DEFERRED_DELETE_BATCH_MAX];
};
struct vy_task_ops {
/**
* This function is called from a worker. It is supposed to do work
* which is too heavy for the tx thread (like IO or compression).
* Returns 0 on success. On failure returns -1 and sets diag.
*/
int (*execute)(struct vy_task *task);
/**
* This function is called by the scheduler upon task completion.
* It may be used to finish the task from the tx thread context.
*
* Returns 0 on success. On failure returns -1 and sets diag.
*/
int (*complete)(struct vy_task *task);
/**
* This function is called by the scheduler if either ->execute
* or ->complete failed. It may be used to undo changes done to
* the LSM tree when preparing the task.
*/
void (*abort)(struct vy_task *task);
};
struct vy_task {
/**
* CBus message used for sending the task to/from
* a worker thread.
*/
struct cmsg cmsg;
/** Virtual method table. */
const struct vy_task_ops *ops;
/** Pointer to the scheduler. */
struct vy_scheduler *scheduler;
/** Worker thread this task is assigned to. */
struct vy_worker *worker;
/**
* Fiber that is currently executing this task in
* a worker thread.
*/
struct fiber *fiber;
/** Time of the task creation. */
double start_time;
/** Set if the task failed. */
bool is_failed;
/** In case of task failure the error is stored here. */
struct diag diag;
/** LSM tree this task is for. */
struct vy_lsm *lsm;
/**
* Copies of lsm->key/cmp_def to protect from
* multithread read/write on alter.
*/
struct key_def *cmp_def;
struct key_def *key_def;
/** Range to compact. */
struct vy_range *range;
/** Run written by this task. */
struct vy_run *new_run;
/** Write iterator producing statements for the new run. */
struct vy_stmt_stream *wi;
/**
* First (newest) and last (oldest) slices to compact.
*
* While a compaction task is in progress, a new slice
* can be added to a range by concurrent dump, so we
* need to remember the slices we are compacting.
*/
struct vy_slice *first_slice, *last_slice;
/**
* Index options may be modified while a task is in
* progress so we save them here to safely access them
* from another thread.
*/
double bloom_fpr;
int64_t page_size;
/**
* Deferred DELETE handler passed to the write iterator.
* It sends deferred DELETE statements generated during
* primary index compaction back to tx.
*/
struct vy_deferred_delete_handler deferred_delete_handler;
/** Batch of deferred deletes generated by this task. */
struct vy_deferred_delete_batch *deferred_delete_batch;
/**
* Number of batches of deferred DELETEs sent to tx
* and not yet processed.
*/
int deferred_delete_in_progress;
/** Link in vy_scheduler::processed_tasks. */
struct stailq_entry in_processed;
};
static const struct vy_deferred_delete_handler_iface
vy_task_deferred_delete_iface;
/**
* Allocate a new task to be executed by a worker thread.
* When preparing an asynchronous task, this function must
* be called before yielding the current fiber in order to
* pin the LSM tree the task is for so that a concurrent fiber
* does not free it from under us.
*/
static struct vy_task *
vy_task_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
struct vy_lsm *lsm, const struct vy_task_ops *ops)
{
struct vy_task *task = calloc(1, sizeof(*task));
if (task == NULL) {
diag_set(OutOfMemory, sizeof(*task),
"malloc", "struct vy_task");
return NULL;
}
memset(task, 0, sizeof(*task));
task->ops = ops;
task->scheduler = scheduler;
task->worker = worker;
task->start_time = ev_monotonic_now(loop());
task->lsm = lsm;
task->cmp_def = key_def_dup(lsm->cmp_def);
if (task->cmp_def == NULL) {
free(task);
return NULL;
}
task->key_def = key_def_dup(lsm->key_def);
if (task->key_def == NULL) {
key_def_delete(task->cmp_def);
free(task);
return NULL;
}
vy_lsm_ref(lsm);
diag_create(&task->diag);
task->deferred_delete_handler.iface = &vy_task_deferred_delete_iface;
return task;
}
/** Free a task allocated with vy_task_new(). */
static void
vy_task_delete(struct vy_task *task)
{
assert(task->deferred_delete_batch == NULL);
assert(task->deferred_delete_in_progress == 0);
key_def_delete(task->cmp_def);
key_def_delete(task->key_def);
vy_lsm_unref(task->lsm);
diag_destroy(&task->diag);
free(task);
}
static bool
vy_dump_heap_less(struct vy_lsm *i1, struct vy_lsm *i2)
{
/*
* LSM trees that are currently being dumped or can't be
* scheduled for dump right now are moved off the top of
* the heap.
*/
if (i1->is_dumping != i2->is_dumping)
return i1->is_dumping < i2->is_dumping;
if (i1->pin_count != i2->pin_count)
return i1->pin_count < i2->pin_count;
/* Older LSM trees are dumped first. */
int64_t i1_generation = vy_lsm_generation(i1);
int64_t i2_generation = vy_lsm_generation(i2);
if (i1_generation != i2_generation)
return i1_generation < i2_generation;
/*
* If a space has more than one index, appending a statement
* to it requires reading the primary index to get the old
* tuple and delete it from secondary indexes. This means that
* on local recovery from WAL, the primary index must not be
* ahead of secondary indexes of the same space, i.e. it must
* be dumped last.
*/
return i1->index_id > i2->index_id;
}
#define HEAP_NAME vy_dump_heap
#define HEAP_LESS(h, l, r) vy_dump_heap_less(l, r)
#define heap_value_t struct vy_lsm
#define heap_value_attr in_dump
#include "salad/heap.h"
static bool
vy_compaction_heap_less(struct vy_lsm *i1, struct vy_lsm *i2)
{
/*
* Prefer LSM trees whose read amplification will be reduced
* most as a result of compaction.
*/
return vy_lsm_compaction_priority(i1) > vy_lsm_compaction_priority(i2);
}
#define HEAP_NAME vy_compaction_heap
#define HEAP_LESS(h, l, r) vy_compaction_heap_less(l, r)
#define heap_value_t struct vy_lsm
#define heap_value_attr in_compaction
#include "salad/heap.h"
static void
vy_worker_pool_start(struct vy_worker_pool *pool)
{
assert(pool->workers == NULL);
pool->workers = calloc(pool->size, sizeof(*pool->workers));
if (pool->workers == NULL)
panic("failed to allocate vinyl worker pool");
for (int i = 0; i < pool->size; i++) {
char name[FIBER_NAME_MAX];
snprintf(name, sizeof(name), "vinyl.%s.%d", pool->name, i);
struct vy_worker *worker = &pool->workers[i];
if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0)
panic("failed to start vinyl worker thread");
worker->pool = pool;
cpipe_create(&worker->worker_pipe, name);
stailq_add_tail_entry(&pool->idle_workers, worker, in_idle);
struct cmsg_hop *route = worker->deferred_delete_route;
route[0].f = vy_deferred_delete_batch_process_f;
route[0].pipe = &worker->worker_pipe;
route[1].f = vy_deferred_delete_batch_free_f;
route[1].pipe = NULL;
}
}
static void
vy_worker_pool_stop(struct vy_worker_pool *pool)
{
assert(pool->workers != NULL);
for (int i = 0; i < pool->size; i++) {
struct vy_worker *worker = &pool->workers[i];
tt_pthread_cancel(worker->cord.id);
tt_pthread_join(worker->cord.id, NULL);
}
free(pool->workers);
pool->workers = NULL;
}
static void
vy_worker_pool_create(struct vy_worker_pool *pool, const char *name, int size)
{
pool->name = name;
pool->size = size;
pool->workers = NULL;
stailq_create(&pool->idle_workers);
}
static void
vy_worker_pool_destroy(struct vy_worker_pool *pool)
{
if (pool->workers != NULL)
vy_worker_pool_stop(pool);
}
/**
* Get an idle worker from a pool.
*/
static struct vy_worker *
vy_worker_pool_get(struct vy_worker_pool *pool)
{
/*
* Start worker threads only when a task is scheduled
* so that they are not dangling around if vinyl is
* not used.
*/
if (pool->workers == NULL)
vy_worker_pool_start(pool);
struct vy_worker *worker = NULL;
if (!stailq_empty(&pool->idle_workers)) {
worker = stailq_shift_entry(&pool->idle_workers,
struct vy_worker, in_idle);
assert(worker->pool == pool);
}
return worker;
}
/**
* Put a worker back to the pool it was allocated from once
* it's done its job.
*/
static void
vy_worker_pool_put(struct vy_worker *worker)
{
struct vy_worker_pool *pool = worker->pool;
stailq_add_entry(&pool->idle_workers, worker, in_idle);
}
void
vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads,
vy_scheduler_dump_complete_f dump_complete_cb,
struct vy_run_env *run_env, struct rlist *read_views)
{
memset(scheduler, 0, sizeof(*scheduler));
scheduler->dump_complete_cb = dump_complete_cb;
scheduler->read_views = read_views;
scheduler->run_env = run_env;
scheduler->scheduler_fiber = fiber_new("vinyl.scheduler",
vy_scheduler_f);
if (scheduler->scheduler_fiber == NULL)
panic("failed to allocate vinyl scheduler fiber");
fiber_cond_create(&scheduler->scheduler_cond);
/*
* Dump tasks must be scheduled as soon as possible,
* otherwise we may run out of memory quota and have
* to stall transactions. To avoid unpredictably long
* stalls caused by ongoing compaction tasks blocking
* scheduling of dump tasks, we use separate thread
* pools for dump and compaction tasks.
*
* Since a design based on LSM trees typically implies
* high write amplification, we allocate only 1/4th of
* all available threads to dump tasks while the rest
* is used exclusively for compaction.
*/
assert(write_threads > 1);
int dump_threads = MAX(1, write_threads / 4);
int compaction_threads = write_threads - dump_threads;
vy_worker_pool_create(&scheduler->dump_pool,
"dump", dump_threads);
vy_worker_pool_create(&scheduler->compaction_pool,
"compaction", compaction_threads);
stailq_create(&scheduler->processed_tasks);
vy_dump_heap_create(&scheduler->dump_heap);
vy_compaction_heap_create(&scheduler->compaction_heap);
diag_create(&scheduler->diag);
fiber_cond_create(&scheduler->dump_cond);
}
void
vy_scheduler_start(struct vy_scheduler *scheduler)
{
fiber_start(scheduler->scheduler_fiber, scheduler);
}
void
vy_scheduler_destroy(struct vy_scheduler *scheduler)
{
/* Stop scheduler fiber. */
scheduler->scheduler_fiber = NULL;
/* Sic: fiber_cancel() can't be used here. */
fiber_cond_signal(&scheduler->dump_cond);
fiber_cond_signal(&scheduler->scheduler_cond);
vy_worker_pool_destroy(&scheduler->dump_pool);
vy_worker_pool_destroy(&scheduler->compaction_pool);
diag_destroy(&scheduler->diag);
fiber_cond_destroy(&scheduler->dump_cond);
fiber_cond_destroy(&scheduler->scheduler_cond);
vy_dump_heap_destroy(&scheduler->dump_heap);
vy_compaction_heap_destroy(&scheduler->compaction_heap);
TRASH(scheduler);
}
void
vy_scheduler_reset_stat(struct vy_scheduler *scheduler)
{
struct vy_scheduler_stat *stat = &scheduler->stat;
stat->tasks_completed = 0;
stat->tasks_failed = 0;
stat->dump_count = 0;
stat->dump_time = 0;
stat->dump_input = 0;
stat->dump_output = 0;
stat->compaction_time = 0;
stat->compaction_input = 0;
stat->compaction_output = 0;
}
static int
vy_scheduler_on_delete_lsm(struct trigger *trigger, void *event)
{
struct vy_lsm *lsm = event;
struct vy_scheduler *scheduler = trigger->data;
assert(! heap_node_is_stray(&lsm->in_dump));
assert(! heap_node_is_stray(&lsm->in_compaction));
vy_dump_heap_delete(&scheduler->dump_heap, lsm);
vy_compaction_heap_delete(&scheduler->compaction_heap, lsm);
trigger_clear(trigger);
free(trigger);
return 0;
}
int
vy_scheduler_add_lsm(struct vy_scheduler *scheduler, struct vy_lsm *lsm)
{
assert(heap_node_is_stray(&lsm->in_dump));
assert(heap_node_is_stray(&lsm->in_compaction));
/*
* Register a trigger that will remove this LSM tree from
* the scheduler queues on destruction.
*/
struct trigger *trigger = malloc(sizeof(*trigger));
if (trigger == NULL) {
diag_set(OutOfMemory, sizeof(*trigger), "malloc", "trigger");
return -1;
}
trigger_create(trigger, vy_scheduler_on_delete_lsm, scheduler, NULL);
trigger_add(&lsm->on_destroy, trigger);
/*
* Add this LSM tree to the scheduler queues so that it
* can be dumped and compacted in a timely manner.
*/
vy_dump_heap_insert(&scheduler->dump_heap, lsm);
vy_compaction_heap_insert(&scheduler->compaction_heap, lsm);
return 0;
}
static void
vy_scheduler_update_lsm(struct vy_scheduler *scheduler, struct vy_lsm *lsm)
{
assert(! heap_node_is_stray(&lsm->in_dump));
assert(! heap_node_is_stray(&lsm->in_compaction));
vy_dump_heap_update(&scheduler->dump_heap, lsm);
vy_compaction_heap_update(&scheduler->compaction_heap, lsm);
}
static void
vy_scheduler_pin_lsm(struct vy_scheduler *scheduler, struct vy_lsm *lsm)
{
assert(!lsm->is_dumping);
if (lsm->pin_count++ == 0)
vy_scheduler_update_lsm(scheduler, lsm);
}
static void
vy_scheduler_unpin_lsm(struct vy_scheduler *scheduler, struct vy_lsm *lsm)
{
assert(!lsm->is_dumping);
assert(lsm->pin_count > 0);
if (--lsm->pin_count == 0)
vy_scheduler_update_lsm(scheduler, lsm);
}
void
vy_scheduler_trigger_dump(struct vy_scheduler *scheduler)
{
if (vy_scheduler_dump_in_progress(scheduler)) {
/* Dump is already in progress, nothing to do. */
return;
}
if (scheduler->checkpoint_in_progress) {
/*
* Do not trigger another dump until checkpoint
* is complete so as to make sure no statements
* inserted after WAL rotation are written to
* the snapshot.
*/
scheduler->dump_pending = true;
return;
}
scheduler->dump_start = ev_monotonic_now(loop());
scheduler->generation++;
scheduler->dump_pending = false;
fiber_cond_signal(&scheduler->scheduler_cond);
}
int
vy_scheduler_dump(struct vy_scheduler *scheduler)
{
/*
* We must not start dump if checkpoint is in progress
* so first wait for checkpoint to complete.
*/
while (scheduler->checkpoint_in_progress)
fiber_cond_wait(&scheduler->dump_cond);
/* Trigger dump. */
if (!vy_scheduler_dump_in_progress(scheduler))
scheduler->dump_start = ev_monotonic_now(loop());
scheduler->generation++;
fiber_cond_signal(&scheduler->scheduler_cond);
/* Wait for dump to complete. */
while (vy_scheduler_dump_in_progress(scheduler)) {
if (scheduler->is_throttled) {
/* Dump error occurred. */
struct error *e = diag_last_error(&scheduler->diag);
diag_set_error(diag_get(), e);
return -1;
}
fiber_cond_wait(&scheduler->dump_cond);
}
return 0;
}
void
vy_scheduler_force_compaction(struct vy_scheduler *scheduler,
struct vy_lsm *lsm)
{
vy_lsm_force_compaction(lsm);
vy_scheduler_update_lsm(scheduler, lsm);
fiber_cond_signal(&scheduler->scheduler_cond);
}
/**
* Check whether the current dump round is complete.
* If it is, free memory and proceed to the next dump round.
*/
static void
vy_scheduler_complete_dump(struct vy_scheduler *scheduler)
{
assert(scheduler->dump_generation < scheduler->generation);
if (scheduler->dump_task_count > 0) {
/*
* There are still dump tasks in progress,
* the dump round can't be over yet.
*/
return;
}
int64_t min_generation = scheduler->generation;
struct vy_lsm *lsm = vy_dump_heap_top(&scheduler->dump_heap);
if (lsm != NULL)
min_generation = vy_lsm_generation(lsm);
if (min_generation == scheduler->dump_generation) {
/*
* There are still LSM trees that must be dumped
* during the current dump round.
*/
return;
}
/*
* The oldest LSM tree data is newer than @dump_generation,
* so the current dump round has been finished. Notify about
* dump completion.
*/
double now = ev_monotonic_now(loop());
double dump_duration = now - scheduler->dump_start;
scheduler->dump_start = now;
scheduler->dump_generation = min_generation;
scheduler->stat.dump_count++;
scheduler->dump_complete_cb(scheduler,
min_generation - 1, dump_duration);
fiber_cond_signal(&scheduler->dump_cond);
}
int
vy_scheduler_begin_checkpoint(struct vy_scheduler *scheduler, bool is_scheduled)
{
assert(!scheduler->checkpoint_in_progress);
/*
* If checkpoint is manually launched (via box.snapshot())
* then ignore throttling and force dump process. Otherwise,
* if the scheduler is throttled due to errors, do not wait
* until it wakes up as it may take quite a while. Instead
* fail checkpoint immediately with the last error seen by
* the scheduler.
*/
if (scheduler->is_throttled) {
if (is_scheduled) {
struct error *e = diag_last_error(&scheduler->diag);
diag_set_error(diag_get(), e);
say_error("cannot checkpoint vinyl, "
"scheduler is throttled with: %s", e->errmsg);
return -1;
}
say_info("scheduler is unthrottled due to manual checkpoint "
"process");
scheduler->is_throttled = false;
}
if (!vy_scheduler_dump_in_progress(scheduler)) {
/*
* We are about to start a new dump round.
* Remember the current time so that we can update
* dump bandwidth when the dump round is complete
* (see vy_scheduler_complete_dump()).
*/
scheduler->dump_start = ev_monotonic_now(loop());
}
scheduler->generation++;
scheduler->checkpoint_in_progress = true;
fiber_cond_signal(&scheduler->scheduler_cond);
say_info("vinyl checkpoint started");
return 0;
}
int
vy_scheduler_wait_checkpoint(struct vy_scheduler *scheduler)
{
if (!scheduler->checkpoint_in_progress)
return 0;
/*
* Wait until all in-memory trees created before
* checkpoint started have been dumped.
*/
while (vy_scheduler_dump_in_progress(scheduler)) {
if (scheduler->is_throttled) {
/* A dump error occurred, abort checkpoint. */
struct error *e = diag_last_error(&scheduler->diag);
diag_set_error(diag_get(), e);
say_error("vinyl checkpoint failed: %s", e->errmsg);
return -1;
}
fiber_cond_wait(&scheduler->dump_cond);
}
say_info("vinyl checkpoint completed");
return 0;
}
void
vy_scheduler_end_checkpoint(struct vy_scheduler *scheduler)
{
if (!scheduler->checkpoint_in_progress)
return;
scheduler->checkpoint_in_progress = false;
if (scheduler->dump_pending) {
/*
* Dump was triggered while checkpoint was
* in progress and hence it was postponed.
* Schedule it now.
*/
vy_scheduler_trigger_dump(scheduler);
}
}
/**
* Allocate a new run for an LSM tree and write the information
* about it to the metadata log so that we could still find
* and delete it in case a write error occured. This function
* is called from dump/compaction task constructor.
*/
static struct vy_run *
vy_run_prepare(struct vy_run_env *run_env, struct vy_lsm *lsm)
{
struct vy_run *run = vy_run_new(run_env, vy_log_next_id());
if (run == NULL)
return NULL;
vy_log_tx_begin();
vy_log_prepare_run(lsm->id, run->id);
if (vy_log_tx_commit() < 0) {
vy_run_unref(run);
return NULL;
}
return run;
}
/**
* Free an incomplete run and write a record to the metadata
* log indicating that the run is not needed any more.
* This function is called on dump/compaction task abort.
*/
static void
vy_run_discard(struct vy_run *run)
{
int64_t run_id = run->id;
vy_run_unref(run);
ERROR_INJECT(ERRINJ_VY_RUN_DISCARD,
{say_error("error injection: run %lld not discarded",
(long long)run_id); return;});
vy_log_tx_begin();
/*
* The run hasn't been used and can be deleted right away
* so set gc_lsn to minimal possible (0).
*/
vy_log_drop_run(run_id, 0);
/*
* Leave the record in the vylog buffer on disk error.
* If we fail to flush it before restart, we will delete
* the run file upon recovery completion.
*/
vy_log_tx_try_commit();
}
/**
* Encode and write a single deferred DELETE statement to
* _vinyl_deferred_delete system space. The rest will be
* done by the space trigger.
*/
static int
vy_deferred_delete_process_one(struct space *deferred_delete_space,
uint32_t space_id, struct tuple_format *format,
struct vy_deferred_delete_stmt *stmt)
{
int64_t lsn = vy_stmt_lsn(stmt->new_stmt);
struct tuple *delete;
delete = vy_stmt_new_surrogate_delete(format, stmt->old_stmt);
if (delete == NULL)
return -1;
uint32_t delete_data_size;
const char *delete_data = tuple_data_range(delete, &delete_data_size);
size_t buf_size = (mp_sizeof_array(3) + mp_sizeof_uint(space_id) +
mp_sizeof_uint(lsn) + delete_data_size);
char *data = region_alloc(&fiber()->gc, buf_size);
if (data == NULL) {
diag_set(OutOfMemory, buf_size, "region", "buf");
tuple_unref(delete);
return -1;
}
char *data_end = data;
data_end = mp_encode_array(data_end, 3);
data_end = mp_encode_uint(data_end, space_id);
data_end = mp_encode_uint(data_end, lsn);
memcpy(data_end, delete_data, delete_data_size);
data_end += delete_data_size;
assert(data_end <= data + buf_size);
struct request request;
memset(&request, 0, sizeof(request));
request.type = IPROTO_REPLACE;
request.space_id = BOX_VINYL_DEFERRED_DELETE_ID;
request.tuple = data;
request.tuple_end = data_end;
tuple_unref(delete);
struct txn *txn = in_txn();
if (txn_begin_stmt(txn, deferred_delete_space) != 0)
return -1;
struct tuple *unused;
if (space_execute_dml(deferred_delete_space, txn,
&request, &unused) != 0) {
txn_rollback_stmt(txn);
return -1;
}
return txn_commit_stmt(txn, &request);
}
/**
* Callback invoked by the tx thread to process deferred DELETE
* statements generated during compaction. It writes deferred
* DELETEs to a special system space, _vinyl_deferred_delete.
* The system space has an on_replace trigger installed which
* propagates the DELETEs to secondary indexes. This way, even
* if a deferred DELETE isn't dumped to disk by vinyl, it still
* can be recovered from WAL.
*/
static void
vy_deferred_delete_batch_process_f(struct cmsg *cmsg)
{
struct vy_deferred_delete_batch *batch = container_of(cmsg,
struct vy_deferred_delete_batch, cmsg);
struct vy_task *task = batch->task;
struct vy_lsm *pk = task->lsm;
assert(pk->index_id == 0);
/*
* A space can be dropped while a compaction task
* is in progress.
*/
if (pk->is_dropped)
return;
struct space *deferred_delete_space;
deferred_delete_space = space_by_id(BOX_VINYL_DEFERRED_DELETE_ID);
assert(deferred_delete_space != NULL);
struct txn *txn = txn_begin();
if (txn == NULL)
goto fail;
for (int i = 0; i < batch->count; i++) {
if (vy_deferred_delete_process_one(deferred_delete_space,
pk->space_id, pk->mem_format,
&batch->stmt[i]) != 0) {
goto fail_rollback;
}
}
if (txn_commit(txn) != 0)
goto fail;
fiber_gc();
return;
fail_rollback:
txn_rollback(txn);
fiber_gc();
fail:
batch->is_failed = true;
diag_move(diag_get(), &batch->diag);
}
/**
* Callback invoked by a worker thread to free processed deferred
* DELETE statements. It must be done on behalf the worker thread
* that generated those DELETEs, because a vinyl statement cannot
* be allocated and freed in different threads.
*/
static void
vy_deferred_delete_batch_free_f(struct cmsg *cmsg)
{
struct vy_deferred_delete_batch *batch = container_of(cmsg,
struct vy_deferred_delete_batch, cmsg);
struct vy_task *task = batch->task;
for (int i = 0; i < batch->count; i++) {
struct vy_deferred_delete_stmt *stmt = &batch->stmt[i];
vy_stmt_unref_if_possible(stmt->old_stmt);
vy_stmt_unref_if_possible(stmt->new_stmt);
}
/*
* Abort the task if the tx thread failed to process
* the batch unless it has already been aborted.
*/
if (batch->is_failed && !task->is_failed) {
assert(!diag_is_empty(&batch->diag));
diag_move(&batch->diag, &task->diag);
task->is_failed = true;
fiber_cancel(task->fiber);
}
diag_destroy(&batch->diag);
free(batch);
/* Notify the caller if this is the last batch. */
assert(task->deferred_delete_in_progress > 0);
if (--task->deferred_delete_in_progress == 0)
fiber_wakeup(task->fiber);
}
/**
* Send all deferred DELETEs accumulated by a vinyl task to
* the tx thread where they will be processed.
*/
static void
vy_task_deferred_delete_flush(struct vy_task *task)
{
struct vy_worker *worker = task->worker;
struct vy_deferred_delete_batch *batch = task->deferred_delete_batch;
if (batch == NULL)
return;
task->deferred_delete_batch = NULL;
task->deferred_delete_in_progress++;
cmsg_init(&batch->cmsg, worker->deferred_delete_route);
cpipe_push(&worker->tx_pipe, &batch->cmsg);
}
/**
* Add a deferred DELETE to a batch. Once the batch gets full,
* submit it to tx where it will get processed.
*/
static int
vy_task_deferred_delete_process(struct vy_deferred_delete_handler *handler,
struct tuple *old_stmt, struct tuple *new_stmt)
{
enum { MAX_IN_PROGRESS = 10 };
struct vy_task *task = container_of(handler, struct vy_task,
deferred_delete_handler);
struct vy_deferred_delete_batch *batch = task->deferred_delete_batch;
/*
* Throttle compaction task if there are too many batches
* being processed so as to limit memory consumption.
*/
while (task->deferred_delete_in_progress >= MAX_IN_PROGRESS)
fiber_sleep(TIMEOUT_INFINITY);
/* Allocate a new batch on demand. */
if (batch == NULL) {
batch = malloc(sizeof(*batch));
if (batch == NULL) {
diag_set(OutOfMemory, sizeof(*batch), "malloc",
"struct vy_deferred_delete_batch");
return -1;
}
memset(batch, 0, sizeof(*batch));
batch->task = task;
diag_create(&batch->diag);
task->deferred_delete_batch = batch;
}
assert(batch->count < VY_DEFERRED_DELETE_BATCH_MAX);
struct vy_deferred_delete_stmt *stmt = &batch->stmt[batch->count++];
stmt->old_stmt = old_stmt;
vy_stmt_ref_if_possible(old_stmt);
stmt->new_stmt = new_stmt;
vy_stmt_ref_if_possible(new_stmt);
if (batch->count == VY_DEFERRED_DELETE_BATCH_MAX)
vy_task_deferred_delete_flush(task);
return 0;
}
/**
* Wait until all pending deferred DELETE statements have been
* processed by tx. Called when the write iterator stops.
*/
static void
vy_task_deferred_delete_destroy(struct vy_deferred_delete_handler *handler)
{
struct vy_task *task = container_of(handler, struct vy_task,
deferred_delete_handler);
vy_task_deferred_delete_flush(task);
while (task->deferred_delete_in_progress > 0)
fiber_sleep(TIMEOUT_INFINITY);
}
static const struct vy_deferred_delete_handler_iface
vy_task_deferred_delete_iface = {
.process = vy_task_deferred_delete_process,
.destroy = vy_task_deferred_delete_destroy,
};
static int
vy_task_write_run(struct vy_task *task, bool no_compression)
{
enum { YIELD_LOOPS = 32 };
struct vy_lsm *lsm = task->lsm;
struct vy_stmt_stream *wi = task->wi;
ERROR_INJECT(ERRINJ_VY_RUN_WRITE,
{diag_set(ClientError, ER_INJECTION,
"vinyl dump"); return -1;});
ERROR_INJECT_SLEEP(ERRINJ_VY_RUN_WRITE_DELAY);
struct vy_run_writer writer;
if (vy_run_writer_create(&writer, task->new_run, lsm->env->path,
lsm->space_id, lsm->index_id,
task->cmp_def, task->key_def,
task->page_size, task->bloom_fpr,
no_compression) != 0)
goto fail;
if (wi->iface->start(wi) != 0)
goto fail_abort_writer;
int rc;
int loops = 0;
struct vy_entry entry = vy_entry_none();
while ((rc = wi->iface->next(wi, &entry)) == 0 && entry.stmt != NULL) {
struct errinj *inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT,
ERRINJ_DOUBLE);
if (inj != NULL && inj->dparam > 0)
thread_sleep(inj->dparam);
rc = vy_run_writer_append_stmt(&writer, entry);
if (rc != 0)
break;
if (++loops % YIELD_LOOPS == 0)
fiber_sleep(0);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
rc = -1;
break;
}
}
wi->iface->stop(wi);
if (rc == 0)
rc = vy_run_writer_commit(&writer);
if (rc != 0)
goto fail_abort_writer;
return 0;
fail_abort_writer:
vy_run_writer_abort(&writer);
fail:
return -1;
}
static int
vy_task_dump_execute(struct vy_task *task)
{
ERROR_INJECT_SLEEP(ERRINJ_VY_DUMP_DELAY);
/*
* Don't compress L1 runs as they are most frequently read
* and smallest runs at the same time and so we would gain
* nothing by compressing them.
*/
return vy_task_write_run(task, true);
}
static int
vy_task_dump_complete(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
struct vy_run *new_run = task->new_run;
int64_t dump_lsn = new_run->dump_lsn;
double dump_time = ev_monotonic_now(loop()) - task->start_time;
struct vy_disk_stmt_counter dump_output = new_run->count;
struct vy_stmt_counter dump_input;
struct vy_mem *mem, *next_mem;
struct vy_slice **new_slices, *slice;
struct vy_range *range, *begin_range, *end_range;
int i;
assert(lsm->is_dumping);
if (vy_run_is_empty(new_run)) {
/*
* In case the run is empty, we can discard the run
* and delete dumped in-memory trees right away w/o
* inserting slices into ranges. However, we need
* to log LSM tree dump anyway.
*/
vy_log_tx_begin();
vy_log_dump_lsm(lsm->id, dump_lsn);
if (vy_log_tx_commit() < 0)
goto fail;
vy_run_discard(new_run);
goto delete_mems;
}
assert(new_run->info.max_lsn <= dump_lsn);
/*
* Figure out which ranges intersect the new run.
*/
if (vy_lsm_find_range_intersection(lsm, new_run->info.min_key,
new_run->info.max_key,
&begin_range, &end_range) != 0)
goto fail;
/*
* For each intersected range allocate a slice of the new run.
*/
new_slices = calloc(lsm->range_count, sizeof(*new_slices));
if (new_slices == NULL) {
diag_set(OutOfMemory, lsm->range_count * sizeof(*new_slices),
"malloc", "struct vy_slice *");
goto fail;
}
for (range = begin_range, i = 0; range != end_range;
range = vy_range_tree_next(&lsm->range_tree, range), i++) {
slice = vy_slice_new(vy_log_next_id(), new_run,
range->begin, range->end, lsm->cmp_def);
if (slice == NULL)
goto fail_free_slices;
assert(i < lsm->range_count);
new_slices[i] = slice;
}
/*
* Log change in metadata.
*/
vy_log_tx_begin();
vy_log_create_run(lsm->id, new_run->id, dump_lsn, new_run->dump_count);
for (range = begin_range, i = 0; range != end_range;
range = vy_range_tree_next(&lsm->range_tree, range), i++) {
assert(i < lsm->range_count);
slice = new_slices[i];
vy_log_insert_slice(range->id, new_run->id, slice->id,
tuple_data_or_null(slice->begin.stmt),
tuple_data_or_null(slice->end.stmt));
}
vy_log_dump_lsm(lsm->id, dump_lsn);
if (vy_log_tx_commit() < 0)
goto fail_free_slices;
/* Account the new run. */
vy_lsm_add_run(lsm, new_run);
/* Drop the reference held by the task. */
vy_run_unref(new_run);
/*
* Add new slices to ranges.
*
* Note, we must not yield after this point, because if we
* do, a concurrent read iterator may see an inconsistent
* LSM tree state, when the same statement is present twice,
* in memory and on disk.
*/
for (range = begin_range, i = 0; range != end_range;
range = vy_range_tree_next(&lsm->range_tree, range), i++) {
assert(i < lsm->range_count);
slice = new_slices[i];
vy_lsm_unacct_range(lsm, range);
vy_range_add_slice(range, slice);
vy_range_update_compaction_priority(range, &lsm->opts);
vy_range_update_dumps_per_compaction(range);
vy_lsm_acct_range(lsm, range);
}
vy_range_heap_update_all(&lsm->range_heap);
free(new_slices);
delete_mems:
/*
* Delete dumped in-memory trees and account dump in
* LSM tree statistics.
*/
vy_stmt_counter_reset(&dump_input);
rlist_foreach_entry_safe(mem, &lsm->sealed, in_sealed, next_mem) {
if (mem->generation > scheduler->dump_generation)
continue;
vy_stmt_counter_add(&dump_input, &mem->count);
vy_lsm_delete_mem(lsm, mem);
}
lsm->dump_lsn = MAX(lsm->dump_lsn, dump_lsn);
vy_lsm_acct_dump(lsm, dump_time, &dump_input, &dump_output);
/*
* Indexes of the same space share a memory level so we
* account dump input only when the primary index is dumped.
*/
if (lsm->index_id == 0)
scheduler->stat.dump_input += dump_input.bytes;
scheduler->stat.dump_output += dump_output.bytes;
scheduler->stat.dump_time += dump_time;
/* The iterator has been cleaned up in a worker thread. */
task->wi->iface->close(task->wi);
lsm->is_dumping = false;
vy_scheduler_update_lsm(scheduler, lsm);
if (lsm->index_id != 0)
vy_scheduler_unpin_lsm(scheduler, lsm->pk);
assert(scheduler->dump_task_count > 0);
scheduler->dump_task_count--;
say_info("%s: dump completed", vy_lsm_name(lsm));
vy_scheduler_complete_dump(scheduler);
return 0;
fail_free_slices:
for (i = 0; i < lsm->range_count; i++) {
slice = new_slices[i];
if (slice != NULL)
vy_slice_delete(slice);
}
free(new_slices);
fail:
return -1;
}
static void
vy_task_dump_abort(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
assert(lsm->is_dumping);
/* The iterator has been cleaned up in a worker thread. */
task->wi->iface->close(task->wi);
struct error *e = diag_last_error(&task->diag);
error_log(e);
say_error("%s: dump failed", vy_lsm_name(lsm));
vy_run_discard(task->new_run);
lsm->is_dumping = false;
vy_scheduler_update_lsm(scheduler, lsm);
if (lsm->index_id != 0)
vy_scheduler_unpin_lsm(scheduler, lsm->pk);
assert(scheduler->dump_task_count > 0);
scheduler->dump_task_count--;
}
/**
* Create a task to dump an LSM tree.
*
* On success the task is supposed to dump all in-memory
* trees created at @scheduler->dump_generation.
*/
static int
vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
struct vy_lsm *lsm, struct vy_task **p_task)
{
static struct vy_task_ops dump_ops = {
.execute = vy_task_dump_execute,
.complete = vy_task_dump_complete,
.abort = vy_task_dump_abort,
};
assert(!lsm->is_dumping);
assert(lsm->pin_count == 0);
assert(vy_lsm_generation(lsm) == scheduler->dump_generation);
assert(scheduler->dump_generation < scheduler->generation);
struct errinj *inj = errinj(ERRINJ_VY_INDEX_DUMP, ERRINJ_INT);
if (inj != NULL && inj->iparam == (int)lsm->index_id) {
diag_set(ClientError, ER_INJECTION, "vinyl index dump");
goto err;
}
/* Rotate the active tree if it needs to be dumped. */
if (lsm->mem->generation == scheduler->dump_generation &&
vy_lsm_rotate_mem(lsm) != 0)
goto err;
/*
* Wait until all active writes to in-memory trees
* eligible for dump are over.
*/
int64_t dump_lsn = -1;
struct vy_mem *mem, *next_mem;
rlist_foreach_entry_safe(mem, &lsm->sealed, in_sealed, next_mem) {
if (mem->generation > scheduler->dump_generation)
continue;
vy_mem_wait_pinned(mem);
if (mem->tree.size == 0) {
/*
* The tree is empty so we can delete it
* right away, without involving a worker.
*/
vy_lsm_delete_mem(lsm, mem);
continue;
}
dump_lsn = MAX(dump_lsn, mem->dump_lsn);
}
if (dump_lsn < 0) {
/* Nothing to do, pick another LSM tree. */
vy_scheduler_update_lsm(scheduler, lsm);
vy_scheduler_complete_dump(scheduler);
return 0;
}
struct vy_task *task = vy_task_new(scheduler, worker, lsm, &dump_ops);
if (task == NULL)
goto err;
struct vy_run *new_run = vy_run_prepare(scheduler->run_env, lsm);
if (new_run == NULL)
goto err_run;
new_run->dump_count = 1;
new_run->dump_lsn = dump_lsn;
/*
* Note, since deferred DELETE are generated on tx commit
* in case the overwritten tuple is found in-memory, no
* deferred DELETE statement should be generated during
* dump so we don't pass a deferred DELETE handler.
*/
struct vy_stmt_stream *wi;
bool is_last_level = (lsm->run_count == 0);
wi = vy_write_iterator_new(task->cmp_def, lsm->index_id == 0,
is_last_level, scheduler->read_views, NULL);
if (wi == NULL)
goto err_wi;
rlist_foreach_entry(mem, &lsm->sealed, in_sealed) {
if (mem->generation > scheduler->dump_generation)
continue;
if (vy_write_iterator_new_mem(wi, mem) != 0)
goto err_wi_sub;
}
task->new_run = new_run;
task->wi = wi;
task->bloom_fpr = lsm->opts.bloom_fpr;
task->page_size = lsm->opts.page_size;
lsm->is_dumping = true;
vy_scheduler_update_lsm(scheduler, lsm);
if (lsm->index_id != 0) {
/*
* The primary index LSM tree must be dumped after
* all secondary index LSM trees of the same space,
* see vy_dump_heap_less(). To make sure it isn't
* picked by the scheduler while all secondary index
* LSM trees are being dumped, temporarily remove
* it from the dump heap.
*/
vy_scheduler_pin_lsm(scheduler, lsm->pk);
}
scheduler->dump_task_count++;
say_info("%s: dump started", vy_lsm_name(lsm));
*p_task = task;
return 0;
err_wi_sub:
task->wi->iface->close(wi);
err_wi:
vy_run_discard(new_run);
err_run:
vy_task_delete(task);
err:
diag_log();
say_error("%s: could not start dump", vy_lsm_name(lsm));
return -1;
}
static int
vy_task_compaction_execute(struct vy_task *task)
{
ERROR_INJECT_SLEEP(ERRINJ_VY_COMPACTION_DELAY);
return vy_task_write_run(task, false);
}
static int
vy_task_compaction_complete(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
struct vy_range *range = task->range;
struct vy_run *new_run = task->new_run;
double compaction_time = ev_monotonic_now(loop()) - task->start_time;
struct vy_disk_stmt_counter compaction_output = new_run->count;
struct vy_disk_stmt_counter compaction_input;
struct vy_slice *first_slice = task->first_slice;
struct vy_slice *last_slice = task->last_slice;
struct vy_slice *slice, *next_slice, *new_slice = NULL;
struct vy_run *run;
/*
* Allocate a slice of the new run.
*
* If the run is empty, we don't need to allocate a new slice
* and insert it into the range, but we still need to delete
* compacted runs.
*/
if (!vy_run_is_empty(new_run)) {
new_slice = vy_slice_new(vy_log_next_id(), new_run,
vy_entry_none(), vy_entry_none(),
lsm->cmp_def);
if (new_slice == NULL)
return -1;
}
/*
* Build the list of runs that became unused
* as a result of compaction.
*/
RLIST_HEAD(unused_runs);
for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) {
slice->run->compacted_slice_count++;
if (slice == last_slice)
break;
}
for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) {
run = slice->run;
if (run->compacted_slice_count == run->slice_count)
rlist_add_entry(&unused_runs, run, in_unused);
slice->run->compacted_slice_count = 0;
if (slice == last_slice)
break;
}
/*
* Log change in metadata.
*/
vy_log_tx_begin();
for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) {
vy_log_delete_slice(slice->id);
if (slice == last_slice)
break;
}
rlist_foreach_entry(run, &unused_runs, in_unused)
vy_log_drop_run(run->id, VY_LOG_GC_LSN_CURRENT);
if (new_slice != NULL) {
vy_log_create_run(lsm->id, new_run->id, new_run->dump_lsn,
new_run->dump_count);
vy_log_insert_slice(range->id, new_run->id, new_slice->id,
tuple_data_or_null(new_slice->begin.stmt),
tuple_data_or_null(new_slice->end.stmt));
}
if (vy_log_tx_commit() < 0) {
if (new_slice != NULL)
vy_slice_delete(new_slice);
return -1;
}
/*
* Remove compacted run files that were created after
* the last checkpoint (and hence are not referenced
* by any checkpoint) immediately to save disk space.
*
* Don't bother logging it to avoid a potential race
* with a garbage collection task, which may be cleaning
* up concurrently. The log will be cleaned up on the
* next checkpoint.
*/
rlist_foreach_entry(run, &unused_runs, in_unused) {
if (run->dump_lsn > vy_log_signature())
vy_run_remove_files(lsm->env->path, lsm->space_id,
lsm->index_id, run->id);
}
/*
* Account the new run if it is not empty,
* otherwise discard it.
*/
if (new_slice != NULL) {
vy_lsm_add_run(lsm, new_run);
/* Drop the reference held by the task. */
vy_run_unref(new_run);
} else
vy_run_discard(new_run);
/*
* Replace compacted slices with the resulting slice and
* account compaction in LSM tree statistics.
*
* Note, since a slice might have been added to the range
* by a concurrent dump while compaction was in progress,
* we must insert the new slice at the same position where
* the compacted slices were.
*/
RLIST_HEAD(compacted_slices);
vy_lsm_unacct_range(lsm, range);
if (new_slice != NULL)
vy_range_add_slice_before(range, new_slice, first_slice);
vy_disk_stmt_counter_reset(&compaction_input);
for (slice = first_slice; ; slice = next_slice) {
next_slice = rlist_next_entry(slice, in_range);
vy_range_remove_slice(range, slice);
rlist_add_entry(&compacted_slices, slice, in_range);
vy_disk_stmt_counter_add(&compaction_input, &slice->count);
if (slice == last_slice)
break;
}
range->n_compactions++;
vy_range_update_compaction_priority(range, &lsm->opts);
vy_range_update_dumps_per_compaction(range);
vy_lsm_acct_range(lsm, range);
vy_lsm_acct_compaction(lsm, compaction_time,
&compaction_input, &compaction_output);
scheduler->stat.compaction_input += compaction_input.bytes;
scheduler->stat.compaction_output += compaction_output.bytes;
scheduler->stat.compaction_time += compaction_time;
/*
* Unaccount unused runs and delete compacted slices.
*/
rlist_foreach_entry(run, &unused_runs, in_unused)
vy_lsm_remove_run(lsm, run);
rlist_foreach_entry_safe(slice, &compacted_slices,
in_range, next_slice) {
vy_slice_wait_pinned(slice);
vy_slice_delete(slice);
}
/* The iterator has been cleaned up in worker. */
task->wi->iface->close(task->wi);
assert(heap_node_is_stray(&range->heap_node));
vy_range_heap_insert(&lsm->range_heap, range);
vy_scheduler_update_lsm(scheduler, lsm);
say_info("%s: completed compacting range %s",
vy_lsm_name(lsm), vy_range_str(range));
return 0;
}
static void
vy_task_compaction_abort(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
struct vy_range *range = task->range;
/* The iterator has been cleaned up in worker. */
task->wi->iface->close(task->wi);
struct error *e = diag_last_error(&task->diag);
error_log(e);
say_error("%s: failed to compact range %s",
vy_lsm_name(lsm), vy_range_str(range));
vy_run_discard(task->new_run);
assert(heap_node_is_stray(&range->heap_node));
vy_range_heap_insert(&lsm->range_heap, range);
vy_scheduler_update_lsm(scheduler, lsm);
}
static int
vy_task_compaction_new(struct vy_scheduler *scheduler, struct vy_worker *worker,
struct vy_lsm *lsm, struct vy_task **p_task)
{
static struct vy_task_ops compaction_ops = {
.execute = vy_task_compaction_execute,
.complete = vy_task_compaction_complete,
.abort = vy_task_compaction_abort,
};
struct vy_range *range = vy_range_heap_top(&lsm->range_heap);
assert(range != NULL);
assert(range->compaction_priority > 1);
if (vy_lsm_split_range(lsm, range) ||
vy_lsm_coalesce_range(lsm, range)) {
vy_scheduler_update_lsm(scheduler, lsm);
return 0;
}
struct vy_task *task = vy_task_new(scheduler, worker, lsm,
&compaction_ops);
if (task == NULL)
goto err_task;
struct vy_run *new_run = vy_run_prepare(scheduler->run_env, lsm);
if (new_run == NULL)
goto err_run;
struct vy_stmt_stream *wi;
bool is_last_level = (range->compaction_priority == range->slice_count);
wi = vy_write_iterator_new(task->cmp_def, lsm->index_id == 0,
is_last_level, scheduler->read_views,
lsm->index_id > 0 ? NULL :
&task->deferred_delete_handler);
if (wi == NULL)
goto err_wi;
struct vy_slice *slice;
int32_t dump_count = 0;
int n = range->compaction_priority;
rlist_foreach_entry(slice, &range->slices, in_range) {
if (vy_write_iterator_new_slice(wi, slice,
lsm->disk_format) != 0)
goto err_wi_sub;
new_run->dump_lsn = MAX(new_run->dump_lsn,
slice->run->dump_lsn);
dump_count += slice->run->dump_count;
/* Remember the slices we are compacting. */
if (task->first_slice == NULL)
task->first_slice = slice;
task->last_slice = slice;
if (--n == 0)
break;
}
assert(n == 0);
assert(new_run->dump_lsn >= 0);
if (range->compaction_priority == range->slice_count)
dump_count -= slice->run->dump_count;
/*
* Do not update dumps_per_compaction in case compaction
* was triggered manually to avoid unexpected side effects,
* such as splitting/coalescing ranges for no good reason.
*/
if (range->needs_compaction)
new_run->dump_count = slice->run->dump_count;
else
new_run->dump_count = dump_count;
range->needs_compaction = false;
task->range = range;
task->new_run = new_run;
task->wi = wi;
task->bloom_fpr = lsm->opts.bloom_fpr;
task->page_size = lsm->opts.page_size;
/*
* Remove the range we are going to compact from the heap
* so that it doesn't get selected again.
*/
vy_range_heap_delete(&lsm->range_heap, range);
vy_scheduler_update_lsm(scheduler, lsm);
say_info("%s: started compacting range %s, runs %d/%d",
vy_lsm_name(lsm), vy_range_str(range),
range->compaction_priority, range->slice_count);
*p_task = task;
return 0;
err_wi_sub:
task->wi->iface->close(wi);
err_wi:
vy_run_discard(new_run);
err_run:
vy_task_delete(task);
err_task:
diag_log();
say_error("%s: could not start compacting range %s: %s",
vy_lsm_name(lsm), vy_range_str(range));
return -1;
}
/**
* Fiber function that actually executes a vinyl task.
* After finishing a task, it sends it back to tx.
*/
static int
vy_task_f(va_list va)
{
struct vy_task *task = va_arg(va, struct vy_task *);
struct vy_worker *worker = task->worker;
assert(task->fiber == fiber());
assert(worker->task == task);
assert(&worker->cord == cord());
if (task->ops->execute(task) != 0 && !task->is_failed) {
struct diag *diag = diag_get();
assert(!diag_is_empty(diag));
task->is_failed = true;
diag_move(diag, &task->diag);
}
cmsg_init(&task->cmsg, vy_task_complete_route);
cpipe_push(&worker->tx_pipe, &task->cmsg);
task->fiber = NULL;
worker->task = NULL;
return 0;
}
/**
* Callback invoked by a worker thread upon receiving a task.
* It schedules a fiber which actually executes the task, so
* as not to block the event loop.
*/
static void
vy_task_execute_f(struct cmsg *cmsg)
{
struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
struct vy_worker *worker = task->worker;
assert(task->fiber == NULL);
assert(worker->task == NULL);
assert(&worker->cord == cord());
task->fiber = fiber_new("task", vy_task_f);
if (task->fiber == NULL) {
task->is_failed = true;
diag_move(diag_get(), &task->diag);
cmsg_init(&task->cmsg, vy_task_complete_route);
cpipe_push(&worker->tx_pipe, &task->cmsg);
} else {
worker->task = task;
fiber_start(task->fiber, task);
}
}
/**
* Callback invoked by the tx thread upon receiving an executed
* task from a worker thread. It adds the task to the processed
* task queue and wakes up the scheduler so that it can complete
* it.
*/
static void
vy_task_complete_f(struct cmsg *cmsg)
{
struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
stailq_add_tail_entry(&task->scheduler->processed_tasks,
task, in_processed);
fiber_cond_signal(&task->scheduler->scheduler_cond);
}
/**
* Create a task for dumping an LSM tree. The new task is returned
* in @ptask. If there's no LSM tree that needs to be dumped or all
* workers are currently busy, @ptask is set to NULL.
*
* We only dump an LSM tree if it needs to be snapshotted or the quota
* on memory usage is exceeded. In either case, the oldest LSM tree
* is selected, because dumping it will free the maximal amount of
* memory due to log structured design of the memory allocator.
*
* Returns 0 on success, -1 on failure.
*/
static int
vy_scheduler_peek_dump(struct vy_scheduler *scheduler, struct vy_task **ptask)
{
struct vy_worker *worker = NULL;
retry:
*ptask = NULL;
if (!vy_scheduler_dump_in_progress(scheduler)) {
/*
* All memory trees of past generations have
* been dumped, nothing to do.
*/
goto no_task;
}
/*
* Look up the oldest LSM tree eligible for dump.
*/
struct vy_lsm *lsm = vy_dump_heap_top(&scheduler->dump_heap);
if (lsm == NULL) {
/*
* There is no LSM tree and so no task to schedule.
* Complete the current dump round.
*/
vy_scheduler_complete_dump(scheduler);
goto no_task;
}
if (!lsm->is_dumping && lsm->pin_count == 0 &&
vy_lsm_generation(lsm) == scheduler->dump_generation) {
/*
* Dump is in progress and there is an LSM tree that
* contains data that must be dumped at the current
* round. Try to create a task for it.
*/
if (worker == NULL) {
worker = vy_worker_pool_get(&scheduler->dump_pool);
if (worker == NULL)
return 0; /* all workers are busy */
}
if (vy_task_dump_new(scheduler, worker, lsm, ptask) != 0) {
vy_worker_pool_put(worker);
return -1;
}
if (*ptask != NULL)
return 0; /* new task */
/*
* All in-memory trees eligible for dump were empty
* and so were deleted without involving a worker
* thread. Check another LSM tree.
*/
goto retry;
}
/*
* Dump is in progress, but all eligible LSM trees are
* already being dumped. Wait until the current round
* is complete.
*/
assert(scheduler->dump_task_count > 0);
no_task:
if (worker != NULL)
vy_worker_pool_put(worker);
return 0;
}
/**
* Create a task for compacting a range. The new task is returned
* in @ptask. If there's no range that needs to be compacted or all
* workers are currently busy, @ptask is set to NULL.
*
* We compact ranges that have more runs in a level than specified
* by run_count_per_level configuration option. Among those runs we
* give preference to those ranges whose compaction will reduce
* read amplification most.
*
* Returns 0 on success, -1 on failure.
*/
static int
vy_scheduler_peek_compaction(struct vy_scheduler *scheduler,
struct vy_task **ptask)
{
struct vy_worker *worker = NULL;
retry:
*ptask = NULL;
struct vy_lsm *lsm = vy_compaction_heap_top(&scheduler->compaction_heap);
if (lsm == NULL)
goto no_task; /* nothing to do */
if (vy_lsm_compaction_priority(lsm) <= 1)
goto no_task; /* nothing to do */
if (worker == NULL) {
worker = vy_worker_pool_get(&scheduler->compaction_pool);
if (worker == NULL)
return 0; /* all workers are busy */
}
if (vy_task_compaction_new(scheduler, worker, lsm, ptask) != 0) {
vy_worker_pool_put(worker);
return -1;
}
if (*ptask == NULL)
goto retry; /* LSM tree dropped or range split/coalesced */
return 0; /* new task */
no_task:
if (worker != NULL)
vy_worker_pool_put(worker);
return 0;
}
static int
vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask)
{
*ptask = NULL;
if (vy_scheduler_peek_dump(scheduler, ptask) != 0)
goto fail;
if (*ptask != NULL)
goto found;
if (vy_scheduler_peek_compaction(scheduler, ptask) != 0)
goto fail;
if (*ptask != NULL)
goto found;
/* no task to run */
return 0;
found:
scheduler->stat.tasks_inprogress++;
return 0;
fail:
assert(!diag_is_empty(diag_get()));
diag_move(diag_get(), &scheduler->diag);
return -1;
}
static int
vy_task_complete(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
assert(scheduler->stat.tasks_inprogress > 0);
scheduler->stat.tasks_inprogress--;
struct diag *diag = &task->diag;
if (task->is_failed) {
assert(!diag_is_empty(diag));
goto fail; /* ->execute fialed */
}
ERROR_INJECT(ERRINJ_VY_TASK_COMPLETE, {
diag_set(ClientError, ER_INJECTION,
"vinyl task completion");
diag_move(diag_get(), diag);
goto fail; });
if (task->ops->complete &&
task->ops->complete(task) != 0) {
assert(!diag_is_empty(diag_get()));
diag_move(diag_get(), diag);
goto fail;
}
scheduler->stat.tasks_completed++;
return 0;
fail:
if (task->ops->abort)
task->ops->abort(task);
diag_move(diag, &scheduler->diag);
scheduler->stat.tasks_failed++;
return -1;
}
static int
vy_scheduler_f(va_list va)
{
struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *);
while (scheduler->scheduler_fiber != NULL) {
struct stailq processed_tasks;
struct vy_task *task, *next;
int tasks_failed = 0, tasks_done = 0;
/* Get the list of processed tasks. */
stailq_create(&processed_tasks);
stailq_concat(&processed_tasks, &scheduler->processed_tasks);
/* Complete and delete all processed tasks. */
stailq_foreach_entry_safe(task, next, &processed_tasks,
in_processed) {
if (vy_task_complete(task) != 0)
tasks_failed++;
else
tasks_done++;
vy_worker_pool_put(task->worker);
vy_task_delete(task);
}
/*
* Reset the timeout if we managed to successfully
* complete at least one task.
*/
if (tasks_done > 0) {
scheduler->timeout = 0;
/*
* Task completion callback may yield, which
* opens a time window for a worker to submit
* a processed task and wake up the scheduler
* (via scheduler_async). Hence we should go
* and recheck the processed_tasks in order not
* to lose a wakeup event and hang for good.
*/
continue;
}
/* Throttle for a while if a task failed. */
if (tasks_failed > 0)
goto error;
/* Get a task to schedule. */
if (vy_schedule(scheduler, &task) != 0)
goto error;
/* Nothing to do or all workers are busy. */
if (task == NULL) {
/* Wait for changes. */
fiber_cond_wait(&scheduler->scheduler_cond);
continue;
}
/* Queue the task for execution. */
cmsg_init(&task->cmsg, vy_task_execute_route);
cpipe_push(&task->worker->worker_pipe, &task->cmsg);
fiber_reschedule();
continue;
error:
/* Abort pending checkpoint. */
fiber_cond_signal(&scheduler->dump_cond);
/*
* A task can fail either due to lack of memory or IO
* error. In either case it is pointless to schedule
* another task right away, because it is likely to fail
* too. So we throttle the scheduler for a while after
* each failure.
*/
scheduler->timeout *= 2;
if (scheduler->timeout < VY_SCHEDULER_TIMEOUT_MIN)
scheduler->timeout = VY_SCHEDULER_TIMEOUT_MIN;
if (scheduler->timeout > VY_SCHEDULER_TIMEOUT_MAX)
scheduler->timeout = VY_SCHEDULER_TIMEOUT_MAX;
struct errinj *inj;
inj = errinj(ERRINJ_VY_SCHED_TIMEOUT, ERRINJ_DOUBLE);
if (inj != NULL && inj->dparam != 0)
scheduler->timeout = inj->dparam;
say_warn("throttling scheduler for %.0f second(s)",
scheduler->timeout);
scheduler->is_throttled = true;
fiber_sleep(scheduler->timeout);
scheduler->is_throttled = false;
}
return 0;
}
static int
vy_worker_f(va_list ap)
{
struct vy_worker *worker = va_arg(ap, struct vy_worker *);
struct cbus_endpoint endpoint;
cpipe_create(&worker->tx_pipe, "tx");
cbus_endpoint_create(&endpoint, cord_name(&worker->cord),
fiber_schedule_cb, fiber());
cbus_loop(&endpoint);
/*
* Cancel the task that is currently being executed by
* this worker and join the fiber before destroying
* the pipe to make sure it doesn't access freed memory.
*/
if (worker->task != NULL) {
struct fiber *fiber = worker->task->fiber;
assert(fiber != NULL);
assert(!fiber_is_dead(fiber));
fiber_set_joinable(fiber, true);
fiber_cancel(fiber);
fiber_join(fiber);
}
cbus_endpoint_destroy(&endpoint, cbus_process);
cpipe_destroy(&worker->tx_pipe);
return 0;
}