Skip to content
Snippets Groups Projects
Commit 1331d232 authored by Vladimir Davydov's avatar Vladimir Davydov
Browse files

vinyl: store pointer to scheduler in struct vy_task

Currently, we don't really need it, but once we switch communication
channel between the scheduler and workers from pthread mutex/cond to
cbus (needed for #2129), tasks won't be completed on behalf of the
scheduler fiber and hence we will need a back pointer from vy_task to
vy_scheduler.

Needed for #2129
parent 15c28b75
No related branches found
No related tags found
No related merge requests found
......@@ -72,24 +72,27 @@ struct vy_task_ops {
* which is too heavy for the tx thread (like IO or compression).
* Returns 0 on success. On failure returns -1 and sets diag.
*/
int (*execute)(struct vy_scheduler *scheduler, struct vy_task *task);
int (*execute)(struct vy_task *task);
/**
* This function is called by the scheduler upon task completion.
* It may be used to finish the task from the tx thread context.
*
* Returns 0 on success. On failure returns -1 and sets diag.
*/
int (*complete)(struct vy_scheduler *scheduler, struct vy_task *task);
int (*complete)(struct vy_task *task);
/**
* This function is called by the scheduler if either ->execute
* or ->complete failed. It may be used to undo changes done to
* the LSM tree when preparing the task.
*/
void (*abort)(struct vy_scheduler *scheduler, struct vy_task *task);
void (*abort)(struct vy_task *task);
};
struct vy_task {
/** Virtual method table. */
const struct vy_task_ops *ops;
/** Pointer to the scheduler. */
struct vy_scheduler *scheduler;
/** Return code of ->execute. */
int status;
/** If ->execute fails, the error is stored here. */
......@@ -138,10 +141,10 @@ struct vy_task {
* does not free it from under us.
*/
static struct vy_task *
vy_task_new(struct mempool *pool, struct vy_lsm *lsm,
vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
const struct vy_task_ops *ops)
{
struct vy_task *task = mempool_alloc(pool);
struct vy_task *task = mempool_alloc(&scheduler->task_pool);
if (task == NULL) {
diag_set(OutOfMemory, sizeof(*task),
"mempool", "struct vy_task");
......@@ -149,16 +152,17 @@ vy_task_new(struct mempool *pool, struct vy_lsm *lsm,
}
memset(task, 0, sizeof(*task));
task->ops = ops;
task->scheduler = scheduler;
task->lsm = lsm;
task->cmp_def = key_def_dup(lsm->cmp_def);
if (task->cmp_def == NULL) {
mempool_free(pool, task);
mempool_free(&scheduler->task_pool, task);
return NULL;
}
task->key_def = key_def_dup(lsm->key_def);
if (task->key_def == NULL) {
key_def_delete(task->cmp_def);
mempool_free(pool, task);
mempool_free(&scheduler->task_pool, task);
return NULL;
}
vy_lsm_ref(lsm);
......@@ -168,14 +172,13 @@ vy_task_new(struct mempool *pool, struct vy_lsm *lsm,
/** Free a task allocated with vy_task_new(). */
static void
vy_task_delete(struct mempool *pool, struct vy_task *task)
vy_task_delete(struct vy_task *task)
{
key_def_delete(task->cmp_def);
key_def_delete(task->key_def);
vy_lsm_unref(task->lsm);
diag_destroy(&task->diag);
TRASH(task);
mempool_free(pool, task);
mempool_free(&task->scheduler->task_pool, task);
}
static bool
......@@ -643,7 +646,7 @@ vy_run_discard(struct vy_run *run)
}
static int
vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task)
vy_task_write_run(struct vy_task *task)
{
struct vy_lsm *lsm = task->lsm;
struct vy_stmt_stream *wi = task->wi;
......@@ -676,7 +679,7 @@ vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task)
if (rc != 0)
break;
if (!scheduler->is_worker_pool_running) {
if (!task->scheduler->is_worker_pool_running) {
diag_set(FiberIsCancelled);
rc = -1;
break;
......@@ -698,14 +701,15 @@ vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task)
}
static int
vy_task_dump_execute(struct vy_scheduler *scheduler, struct vy_task *task)
vy_task_dump_execute(struct vy_task *task)
{
return vy_task_write_run(scheduler, task);
return vy_task_write_run(task);
}
static int
vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
vy_task_dump_complete(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
struct vy_run *new_run = task->new_run;
int64_t dump_lsn = new_run->dump_lsn;
......@@ -871,8 +875,9 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
}
static void
vy_task_dump_abort(struct vy_scheduler *scheduler, struct vy_task *task)
vy_task_dump_abort(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
assert(lsm->is_dumping);
......@@ -975,8 +980,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
return 0;
}
struct vy_task *task = vy_task_new(&scheduler->task_pool,
lsm, &dump_ops);
struct vy_task *task = vy_task_new(scheduler, lsm, &dump_ops);
if (task == NULL)
goto err;
......@@ -1031,7 +1035,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
err_wi:
vy_run_discard(new_run);
err_run:
vy_task_delete(&scheduler->task_pool, task);
vy_task_delete(task);
err:
diag_log();
say_error("%s: could not start dump", vy_lsm_name(lsm));
......@@ -1039,14 +1043,15 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
}
static int
vy_task_compact_execute(struct vy_scheduler *scheduler, struct vy_task *task)
vy_task_compact_execute(struct vy_task *task)
{
return vy_task_write_run(scheduler, task);
return vy_task_write_run(task);
}
static int
vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task)
vy_task_compact_complete(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
struct vy_range *range = task->range;
struct vy_run *new_run = task->new_run;
......@@ -1191,8 +1196,9 @@ vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task)
}
static void
vy_task_compact_abort(struct vy_scheduler *scheduler, struct vy_task *task)
vy_task_compact_abort(struct vy_task *task)
{
struct vy_scheduler *scheduler = task->scheduler;
struct vy_lsm *lsm = task->lsm;
struct vy_range *range = task->range;
......@@ -1243,8 +1249,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
return 0;
}
struct vy_task *task = vy_task_new(&scheduler->task_pool,
lsm, &compact_ops);
struct vy_task *task = vy_task_new(scheduler, lsm, &compact_ops);
if (task == NULL)
goto err_task;
......@@ -1303,7 +1308,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
err_wi:
vy_run_discard(new_run);
err_run:
vy_task_delete(&scheduler->task_pool, task);
vy_task_delete(task);
err_task:
diag_log();
say_error("%s: could not start compacting range %s: %s",
......@@ -1444,12 +1449,11 @@ vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask)
}
static int
vy_scheduler_complete_task(struct vy_scheduler *scheduler,
struct vy_task *task)
vy_task_complete(struct vy_task *task)
{
if (task->lsm->is_dropped) {
if (task->ops->abort)
task->ops->abort(scheduler, task);
task->ops->abort(task);
return 0;
}
......@@ -1464,7 +1468,7 @@ vy_scheduler_complete_task(struct vy_scheduler *scheduler,
diag_move(diag_get(), diag);
goto fail; });
if (task->ops->complete &&
task->ops->complete(scheduler, task) != 0) {
task->ops->complete(task) != 0) {
assert(!diag_is_empty(diag_get()));
diag_move(diag_get(), diag);
goto fail;
......@@ -1472,8 +1476,8 @@ vy_scheduler_complete_task(struct vy_scheduler *scheduler,
return 0;
fail:
if (task->ops->abort)
task->ops->abort(scheduler, task);
diag_move(diag, &scheduler->diag);
task->ops->abort(task);
diag_move(diag, &task->scheduler->diag);
return -1;
}
......@@ -1510,11 +1514,11 @@ vy_scheduler_f(va_list va)
/* Complete and delete all processed tasks. */
stailq_foreach_entry_safe(task, next, &output_queue, link) {
if (vy_scheduler_complete_task(scheduler, task) != 0)
if (vy_task_complete(task) != 0)
tasks_failed++;
else
tasks_done++;
vy_task_delete(&scheduler->task_pool, task);
vy_task_delete(task);
scheduler->workers_available++;
assert(scheduler->workers_available <=
scheduler->worker_pool_size);
......@@ -1615,7 +1619,7 @@ vy_worker_f(void *arg)
assert(task != NULL);
/* Execute task */
task->status = task->ops->execute(scheduler, task);
task->status = task->ops->execute(task);
if (task->status != 0) {
struct diag *diag = diag_get();
assert(!diag_is_empty(diag));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment