diff --git a/src/box/vy_log.c b/src/box/vy_log.c index bdc1cfa3142f5f0e0440ddbd2d1ddae1d331d4ec..098a01419eaa56a2304290eb9ce58fb93f86578b 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -51,6 +51,7 @@ #include "errcode.h" #include "errinj.h" #include "fiber.h" +#include "fiber_cond.h" #include "iproto_constants.h" /* IPROTO_INSERT */ #include "key_def.h" #include "latch.h" @@ -130,6 +131,16 @@ static const char *vy_log_type_name[] = { [VY_LOG_ABORT_REBOOTSTRAP] = "abort_rebootstrap", }; +/** Batch of vylog records that must be written in one go. */ +struct vy_log_tx { + /** Link in vy_log::pending_tx. */ + struct stailq_entry in_pending; + /** Region used for allocating records. */ + struct region region; + /** List of records, linked by vy_log_record::in_tx. */ + struct stailq records; +}; + /** Metadata log object. */ struct vy_log { /** @@ -142,27 +153,36 @@ struct vy_log { struct vclock last_checkpoint; /** Recovery context. */ struct vy_recovery *recovery; - /** Latch protecting the log buffer. */ + /** + * Latch that syncs log writers against readers. + * Needed so that we don't miss any records during + * log rotation. + */ struct latch latch; + /** + * Background fiber flushing pending transactions. + * Lives throughout the vinyl engine lifetime. Note, + * we don't stop it in destructor, because the event + * loop is dead at that time so we can't properly + * join it. + */ + struct fiber *flusher; + /** Condition variable used for signalling the flusher. */ + struct fiber_cond flusher_cond; /** * Next ID to use for a vinyl object. * Used by vy_log_next_id(). */ int64_t next_id; - /** A region of struct vy_log_record entries. */ - struct region pool; - /** - * Records awaiting to be written to disk. - * Linked by vy_log_record::in_tx; - */ - struct stailq tx; - /** Start of the current transaction in the pool, for rollback */ - size_t tx_svp; + /** Pool of vy_log_tx objects. */ + struct mempool tx_pool; + /** Current transaction or NULL. */ + struct vy_log_tx *tx; /** - * Last record in the queue at the time when the current - * transaction was started. Used for rollback. + * List of transactions awaiting to be flushed to disk, + * linked by vy_log_tx::in_pending. */ - struct stailq_entry *tx_begin; + struct stailq pending_tx; /** * Flag set if vy_log_write() failed. * @@ -181,6 +201,9 @@ struct vy_log { }; static struct vy_log vy_log; +static int +vy_log_flusher_f(va_list va); + static struct vy_recovery * vy_recovery_new_locked(int64_t signature, int flags); @@ -737,23 +760,47 @@ vy_log_init(const char *dir) xdir_create(&vy_log.dir, dir, VYLOG, &INSTANCE_UUID, &xlog_opts_default); latch_create(&vy_log.latch); - region_create(&vy_log.pool, cord_slab_cache()); - stailq_create(&vy_log.tx); + mempool_create(&vy_log.tx_pool, cord_slab_cache(), + sizeof(struct vy_log_tx)); + stailq_create(&vy_log.pending_tx); diag_create(&vy_log.tx_diag); wal_init_vy_log(); + fiber_cond_create(&vy_log.flusher_cond); + vy_log.flusher = fiber_new("vinyl.vylog_flusher", + vy_log_flusher_f); + if (vy_log.flusher == NULL) + panic("failed to allocate vylog flusher fiber"); + fiber_wakeup(vy_log.flusher); +} + +static struct vy_log_tx * +vy_log_tx_new(void) +{ + struct vy_log_tx *tx = mempool_alloc(&vy_log.tx_pool); + if (tx == NULL) { + diag_set(OutOfMemory, sizeof(*tx), "mempool", "vy log tx"); + return NULL; + } + region_create(&tx->region, cord_slab_cache()); + stailq_create(&tx->records); + tx->in_pending.next = NULL; + return tx; +} + +static void +vy_log_tx_delete(struct vy_log_tx *tx) +{ + region_destroy(&tx->region); + mempool_free(&vy_log.tx_pool, tx); } /** - * Try to flush the log buffer to disk. - * - * We always flush the entire vy_log buffer as a single xlog - * transaction, since we do not track boundaries of @no_discard - * buffered transactions, and want to avoid a partial write. + * Write a given transaction to disk. */ static int -vy_log_flush(void) +vy_log_tx_flush(struct vy_log_tx *tx) { - if (stailq_empty(&vy_log.tx)) + if (stailq_empty(&tx->records)) return 0; /* nothing to do */ ERROR_INJECT(ERRINJ_VY_LOG_FLUSH, { @@ -768,7 +815,7 @@ vy_log_flush(void) int tx_size = 0; struct vy_log_record *record; - stailq_foreach_entry(record, &vy_log.tx, in_tx) + stailq_foreach_entry(record, &tx->records, in_tx) tx_size++; size_t used = region_used(&fiber()->gc); @@ -787,7 +834,9 @@ vy_log_flush(void) * Encode buffered records. */ int i = 0; - stailq_foreach_entry(record, &vy_log.tx, in_tx) { + stailq_foreach_entry(record, &tx->records, in_tx) { + if (record->gc_lsn == VY_LOG_GC_LSN_CURRENT) + record->gc_lsn = vy_log_signature(); assert(i < tx_size); struct xrow_header *row = &rows[i]; if (vy_log_record_encode(record, row) < 0) @@ -804,9 +853,6 @@ vy_log_flush(void) if (wal_write_vy_log(entry) != 0) goto err; - /* Success. Free flushed records. */ - region_reset(&vy_log.pool); - stailq_create(&vy_log.tx); region_truncate(&fiber()->gc, used); return 0; err: @@ -814,11 +860,78 @@ vy_log_flush(void) return -1; } +/** + * Write all pending transaction to disk. + */ +static int +vy_log_flush(void) +{ + /* + * vy_log_tx_try_commit() can add a new transaction to + * the list while we are writing to disk. This is okay - + * we'll flush it next time. If we fail, we put remaining + * transactions back to the head of the list to preserve + * the commit order. + */ + struct stailq pending; + stailq_create(&pending); + stailq_concat(&pending, &vy_log.pending_tx); + + int rc = 0; + while (!stailq_empty(&pending)) { + struct vy_log_tx *tx = stailq_first_entry(&pending, + struct vy_log_tx, in_pending); + rc = vy_log_tx_flush(tx); + if (rc != 0) + break; + stailq_shift(&pending); + vy_log_tx_delete(tx); + } + stailq_concat(&pending, &vy_log.pending_tx); + stailq_concat(&vy_log.pending_tx, &pending); + return rc; +} + +static int +vy_log_flusher_f(va_list va) +{ + (void)va; + while (!fiber_is_cancelled()) { + /* + * Disable writes during local recovery. + * See vy_log_tx_commit(). + */ + if (vy_log.recovery != NULL || + stailq_empty(&vy_log.pending_tx)) { + fiber_cond_wait(&vy_log.flusher_cond); + continue; + } + latch_lock(&vy_log.latch); + int rc = vy_log_flush(); + latch_unlock(&vy_log.latch); + if (rc != 0) { + diag_log(); + say_error("failed to flush vylog"); + /* + * Don't retry immediately after a failure + * since the next write is likely to fail + * as well. Instead wait for the next signal. + */ + fiber_cond_wait(&vy_log.flusher_cond); + } + } + return 0; +} + void vy_log_free(void) { + struct vy_log_tx *tx, *next_tx; + stailq_foreach_entry_safe(tx, next_tx, &vy_log.pending_tx, in_pending) + vy_log_tx_delete(tx); + stailq_create(&vy_log.pending_tx); + mempool_destroy(&vy_log.tx_pool); xdir_destroy(&vy_log.dir); - region_destroy(&vy_log.pool); diag_destroy(&vy_log.tx_diag); } @@ -998,9 +1111,12 @@ vy_log_end_recovery(void) * Update the recovery context with records written during * recovery - we will need them for garbage collection. */ - struct vy_log_record *record; - stailq_foreach_entry(record, &vy_log.tx, in_tx) - vy_recovery_process_record(vy_log.recovery, record); + struct vy_log_tx *tx; + stailq_foreach_entry(tx, &vy_log.pending_tx, in_pending) { + struct vy_log_record *record; + stailq_foreach_entry(record, &tx->records, in_tx) + vy_recovery_process_record(vy_log.recovery, record); + } /* Flush all pending records. */ if (vy_log_flush() < 0) { @@ -1123,102 +1239,95 @@ vy_log_backup_path(const struct vclock *vclock) void vy_log_tx_begin(void) { - latch_lock(&vy_log.latch); - vy_log.tx_begin = stailq_last(&vy_log.tx); - vy_log.tx_svp = region_used(&vy_log.pool); - vy_log.tx_failed = false; + assert(!vy_log.tx_failed); + assert(vy_log.tx == NULL); + vy_log.tx = vy_log_tx_new(); + if (vy_log.tx == NULL) { + diag_move(diag_get(), &vy_log.tx_diag); + vy_log.tx_failed = true; + } say_verbose("begin vylog transaction"); } -/** - * Commit a transaction started with vy_log_tx_begin(). - * - * If @no_discard is set, pending records won't be expunged from the - * buffer on failure, so that the next transaction will retry to write - * them to disk. - */ -static int -vy_log_tx_do_commit(bool no_discard) +int +vy_log_tx_commit(void) { - struct stailq rollback; + /* + * During recovery, we may replay records we failed to commit + * before restart (e.g. drop LSM tree). Since the log isn't open + * yet, simply leave them in the tx buffer to be flushed upon + * recovery completion. + */ + if (vy_log.recovery != NULL) { + vy_log_tx_try_commit(); + return 0; + } - assert(latch_owner(&vy_log.latch) == fiber()); + struct vy_log_tx *tx = vy_log.tx; + vy_log.tx = NULL; if (vy_log.tx_failed) { - /* - * vy_log_write() failed to append a record to tx. - * @no_discard transactions can't handle this. - */ diag_move(&vy_log.tx_diag, diag_get()); - if (no_discard) { - diag_log(); - panic("non-discardable vylog transaction failed"); - } - goto rollback; + vy_log.tx_failed = false; + if (tx != NULL) + vy_log_tx_delete(tx); + goto err; } + assert(tx != NULL); /* - * During recovery, we may replay records we failed to commit - * before restart (e.g. drop LSM tree). Since the log isn't open - * yet, simply leave them in the tx buffer to be flushed upon - * recovery completion. + * Before writing this transaction, flush all pending ones + * if any, because they were committed first. */ - if (vy_log.recovery != NULL) - goto done; + latch_lock(&vy_log.latch); + int rc = vy_log_flush(); + if (rc == 0) + rc = vy_log_tx_flush(tx); + latch_unlock(&vy_log.latch); - if (vy_log_flush() != 0) { - if (!no_discard) - goto rollback; - /* - * We were told not to discard the transaction on - * failure so just warn and leave it in the buffer. - */ - struct error *e = diag_last_error(diag_get()); - say_warn("failed to flush vylog: %s", e->errmsg); - } + vy_log_tx_delete(tx); + if (rc != 0) + goto err; -done: say_verbose("commit vylog transaction"); - latch_unlock(&vy_log.latch); return 0; - -rollback: - stailq_cut_tail(&vy_log.tx, vy_log.tx_begin, &rollback); - region_truncate(&vy_log.pool, vy_log.tx_svp); - vy_log.tx_svp = 0; +err: say_verbose("rollback vylog transaction"); - latch_unlock(&vy_log.latch); return -1; } -int -vy_log_tx_commit(void) -{ - return vy_log_tx_do_commit(false); -} - void vy_log_tx_try_commit(void) { - if (vy_log_tx_do_commit(true) != 0) - unreachable(); + if (vy_log.tx_failed) { + diag_move(&vy_log.tx_diag, diag_get()); + diag_log(); + panic("non-discardable vylog transaction failed"); + } + assert(vy_log.tx != NULL); + stailq_add_tail_entry(&vy_log.pending_tx, vy_log.tx, in_pending); + fiber_cond_signal(&vy_log.flusher_cond); + vy_log.tx = NULL; + say_verbose("commit vylog transaction"); } void vy_log_write(const struct vy_log_record *record) { - assert(latch_owner(&vy_log.latch) == fiber()); + say_verbose("write vylog record: %s", vy_log_record_str(record)); - struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.pool, + if (vy_log.tx_failed) + return; + + assert(vy_log.tx != NULL); + struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.tx->region, record); if (tx_record == NULL) { diag_move(diag_get(), &vy_log.tx_diag); vy_log.tx_failed = true; return; } - - say_verbose("write vylog record: %s", vy_log_record_str(tx_record)); - stailq_add_tail_entry(&vy_log.tx, tx_record, in_tx); + stailq_add_tail_entry(&vy_log.tx->records, tx_record, in_tx); } /** diff --git a/src/box/vy_log.h b/src/box/vy_log.h index ee38c19381693d4c78a000e4c37d2c818241d853..298a8ed4cd80bb4610c0adf7cff24e094903eaf3 100644 --- a/src/box/vy_log.h +++ b/src/box/vy_log.h @@ -221,6 +221,16 @@ enum vy_log_record_type { vy_log_record_type_MAX }; +/** + * Special value of vy_log_record::gc_lsn replaced with the signature + * of the vylog file the record will be written to. We need it so as + * to make sure we write the current vylog signature (not the previous + * one) when compaction completion races with vylog rotation. Writing + * the previous vylog signature would result in premature run file + * collection. + */ +enum { VY_LOG_GC_LSN_CURRENT = -1 }; + /** Record in the metadata log. */ struct vy_log_record { /** Type of the record. */ @@ -273,7 +283,7 @@ struct vy_log_record { int64_t gc_lsn; /** For runs: number of dumps it took to create the run. */ uint32_t dump_count; - /** Link in vy_log::tx. */ + /** Link in vy_log_tx::records. */ struct stailq_entry in_tx; }; @@ -510,6 +520,8 @@ vy_log_tx_commit(void); * buffered records to disk, but in case of failure pending records * are not expunged from the buffer, so that the next transaction * will retry to flush them. + * + * In contrast to vy_log_tx_commit(), this function doesn't yield. */ void vy_log_tx_try_commit(void); diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 0180331e3b0a7178d0ed0fad4e856c800382abc9..85c1659b0219d8f31b9f38b8b08237e5f7f30417 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -1503,9 +1503,8 @@ vy_task_compaction_complete(struct vy_task *task) if (slice == last_slice) break; } - int64_t gc_lsn = vy_log_signature(); rlist_foreach_entry(run, &unused_runs, in_unused) - vy_log_drop_run(run->id, gc_lsn); + vy_log_drop_run(run->id, VY_LOG_GC_LSN_CURRENT); if (new_slice != NULL) { vy_log_create_run(lsm->id, new_run->id, new_run->dump_lsn, new_run->dump_count); @@ -1530,7 +1529,7 @@ vy_task_compaction_complete(struct vy_task *task) * next checkpoint. */ rlist_foreach_entry(run, &unused_runs, in_unused) { - if (run->dump_lsn > gc_lsn) + if (run->dump_lsn > vy_log_signature()) vy_run_remove_files(lsm->env->path, lsm->space_id, lsm->index_id, run->id); }