Skip to content
Snippets Groups Projects
Commit 46f50aad authored by Vladimir Davydov's avatar Vladimir Davydov
Browse files

vinyl: rename some members of vy_scheduler and vy_task struct

I'm planning to add some new members and remove some old members from
those structs. For this to play nicely, let's do some renames:

  vy_scheduler::workers_available => idle_worker_count
  vy_scheduler::input_queue       => pending_tasks
  vy_scheduler::output_queue      => processed_tasks
  vy_task::link                   => in_pending, in_processed
parent 1331d232
No related branches found
No related tags found
No related merge requests found
......@@ -119,11 +119,6 @@ struct vy_task {
* need to remember the slices we are compacting.
*/
struct vy_slice *first_slice, *last_slice;
/**
* Link in the list of pending or processed tasks.
* See vy_scheduler::input_queue, output_queue.
*/
struct stailq_entry link;
/**
* Index options may be modified while a task is in
* progress so we save them here to safely access them
......@@ -131,6 +126,10 @@ struct vy_task {
*/
double bloom_fpr;
int64_t page_size;
/** Link in vy_scheduler::pending_tasks. */
struct stailq_entry in_pending;
/** Link in vy_scheduler::processed_tasks. */
struct stailq_entry in_processed;
};
/**
......@@ -259,7 +258,7 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler)
assert(scheduler->worker_pool_size >= 2);
scheduler->is_worker_pool_running = true;
scheduler->workers_available = scheduler->worker_pool_size;
scheduler->idle_worker_count = scheduler->worker_pool_size;
scheduler->worker_pool = calloc(scheduler->worker_pool_size,
sizeof(struct cord));
if (scheduler->worker_pool == NULL)
......@@ -318,8 +317,8 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads,
scheduler->worker_pool_size = write_threads;
mempool_create(&scheduler->task_pool, cord_slab_cache(),
sizeof(struct vy_task));
stailq_create(&scheduler->input_queue);
stailq_create(&scheduler->output_queue);
stailq_create(&scheduler->pending_tasks);
stailq_create(&scheduler->processed_tasks);
tt_pthread_cond_init(&scheduler->worker_cond, NULL);
tt_pthread_mutex_init(&scheduler->mutex, NULL);
......@@ -1422,7 +1421,7 @@ vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask)
if (*ptask != NULL)
return 0;
if (scheduler->workers_available <= 1) {
if (scheduler->idle_worker_count <= 1) {
/*
* If all worker threads are busy doing compaction
* when we run out of quota, ongoing transactions will
......@@ -1501,26 +1500,27 @@ vy_scheduler_f(va_list va)
vy_scheduler_start_workers(scheduler);
while (scheduler->scheduler_fiber != NULL) {
struct stailq output_queue;
struct stailq processed_tasks;
struct vy_task *task, *next;
int tasks_failed = 0, tasks_done = 0;
bool was_empty;
/* Get the list of processed tasks. */
stailq_create(&output_queue);
stailq_create(&processed_tasks);
tt_pthread_mutex_lock(&scheduler->mutex);
stailq_concat(&output_queue, &scheduler->output_queue);
stailq_concat(&processed_tasks, &scheduler->processed_tasks);
tt_pthread_mutex_unlock(&scheduler->mutex);
/* Complete and delete all processed tasks. */
stailq_foreach_entry_safe(task, next, &output_queue, link) {
stailq_foreach_entry_safe(task, next, &processed_tasks,
in_processed) {
if (vy_task_complete(task) != 0)
tasks_failed++;
else
tasks_done++;
vy_task_delete(task);
scheduler->workers_available++;
assert(scheduler->workers_available <=
scheduler->idle_worker_count++;
assert(scheduler->idle_worker_count <=
scheduler->worker_pool_size);
}
/*
......@@ -1534,7 +1534,7 @@ vy_scheduler_f(va_list va)
* opens a time window for a worker to submit
* a processed task and wake up the scheduler
* (via scheduler_async). Hence we should go
* and recheck the output_queue in order not
* and recheck the processed_tasks in order not
* to lose a wakeup event and hang for good.
*/
continue;
......@@ -1543,7 +1543,7 @@ vy_scheduler_f(va_list va)
if (tasks_failed > 0)
goto error;
/* All worker threads are busy. */
if (scheduler->workers_available == 0)
if (scheduler->idle_worker_count == 0)
goto wait;
/* Get a task to schedule. */
if (vy_schedule(scheduler, &task) != 0)
......@@ -1554,13 +1554,14 @@ vy_scheduler_f(va_list va)
/* Queue the task and notify workers if necessary. */
tt_pthread_mutex_lock(&scheduler->mutex);
was_empty = stailq_empty(&scheduler->input_queue);
stailq_add_tail_entry(&scheduler->input_queue, task, link);
was_empty = stailq_empty(&scheduler->pending_tasks);
stailq_add_tail_entry(&scheduler->pending_tasks,
task, in_pending);
if (was_empty)
tt_pthread_cond_signal(&scheduler->worker_cond);
tt_pthread_mutex_unlock(&scheduler->mutex);
scheduler->workers_available--;
scheduler->idle_worker_count--;
fiber_reschedule();
continue;
error:
......@@ -1605,7 +1606,7 @@ vy_worker_f(void *arg)
tt_pthread_mutex_lock(&scheduler->mutex);
while (scheduler->is_worker_pool_running) {
/* Wait for a task */
if (stailq_empty(&scheduler->input_queue)) {
if (stailq_empty(&scheduler->pending_tasks)) {
/* Wake scheduler up if there are no more tasks */
ev_async_send(scheduler->scheduler_loop,
&scheduler->scheduler_async);
......@@ -1613,8 +1614,8 @@ vy_worker_f(void *arg)
&scheduler->mutex);
continue;
}
task = stailq_shift_entry(&scheduler->input_queue,
struct vy_task, link);
task = stailq_shift_entry(&scheduler->pending_tasks,
struct vy_task, in_pending);
tt_pthread_mutex_unlock(&scheduler->mutex);
assert(task != NULL);
......@@ -1628,7 +1629,8 @@ vy_worker_f(void *arg)
/* Return processed task to scheduler */
tt_pthread_mutex_lock(&scheduler->mutex);
stailq_add_tail_entry(&scheduler->output_queue, task, link);
stailq_add_tail_entry(&scheduler->processed_tasks,
task, in_processed);
}
tt_pthread_mutex_unlock(&scheduler->mutex);
return NULL;
......
......@@ -77,13 +77,13 @@ struct vy_scheduler {
/** Total number of worker threads. */
int worker_pool_size;
/** Number worker threads that are currently idle. */
int workers_available;
int idle_worker_count;
/** Memory pool used for allocating vy_task objects. */
struct mempool task_pool;
/** Queue of pending tasks, linked by vy_task::link. */
struct stailq input_queue;
/** Queue of processed tasks, linked by vy_task::link. */
struct stailq output_queue;
/** Queue of pending tasks, linked by vy_task::in_pending. */
struct stailq pending_tasks;
/** Queue of processed tasks, linked by vy_task::in_processed. */
struct stailq processed_tasks;
/**
* Signaled to wake up a worker when there is
* a pending task in the input queue. Also used
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment