diff --git a/src/box/alter.cc b/src/box/alter.cc index 4e9452280c6c648a1aa0277d071e349385739ccc..e76b9e68535cfa2228ea1c937d4c790b00985e32 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -3575,6 +3575,12 @@ unlock_after_dd(struct trigger *trigger, void *event) { (void) trigger; (void) event; + /* + * In case of yielding journal this trigger will be processed + * in a context of tx_prio endpoint instead of a context of + * a fiber which has this latch locked. So steal the latch first. + */ + latch_steal(&schema_lock); latch_unlock(&schema_lock); } diff --git a/src/box/box.cc b/src/box/box.cc index d53b0cdc587b837fb2bfe104439c523213a1563b..f5bd29dd5930fe1e4209cafa78a88f9b9e70b96c 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -303,10 +303,12 @@ struct recovery_journal { */ static int64_t recovery_journal_write(struct journal *base, - struct journal_entry * /* entry */) + struct journal_entry *entry) { struct recovery_journal *journal = (struct recovery_journal *) base; - return vclock_sum(journal->vclock); + entry->res = vclock_sum(journal->vclock); + journal_entry_complete(entry); + return entry->res; } static inline void diff --git a/src/box/journal.c b/src/box/journal.c index fe13fb6eeaaf00f3a304f949151bd7716adefa28..4c1997f36488ef4c0a16c022f454e49527a09a24 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -41,7 +41,8 @@ static int64_t dummy_journal_write(struct journal *journal, struct journal_entry *entry) { (void) journal; - (void) entry; + entry->res = 0; + journal_entry_complete(entry); return 0; } @@ -53,7 +54,9 @@ static struct journal dummy_journal = { struct journal *current_journal = &dummy_journal; struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region) +journal_entry_new(size_t n_rows, struct region *region, + journal_entry_done_cb on_done_cb, + void *on_done_cb_data) { struct journal_entry *entry; @@ -70,6 +73,8 @@ journal_entry_new(size_t n_rows, struct region *region) entry->n_rows = n_rows; entry->res = -1; entry->fiber = fiber(); + entry->on_done_cb = on_done_cb; + entry->on_done_cb_data = on_done_cb_data; return entry; } diff --git a/src/box/journal.h b/src/box/journal.h index 8ac32ee5e15566bcbb16b3ad3352255a7d9e12fb..a4a3f032fda71fdda9ca800840132f1e5b2f2549 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -39,6 +39,11 @@ extern "C" { #endif /* defined(__cplusplus) */ struct xrow_header; +struct journal_entry; + +/** Journal entry finalization callback typedef. */ +typedef void (*journal_entry_done_cb)(struct journal_entry *entry, void *data); + /** * An entry for an abstract journal. * Simply put, a write ahead log request. @@ -58,6 +63,17 @@ struct journal_entry { * The fiber issuing the request. */ struct fiber *fiber; + /** + * A journal entry finalization callback which is going to be called + * after the entry processing was finished in both cases: success + * or fail. Entry->res is set to a result value before the callback + * is fired. + */ + journal_entry_done_cb on_done_cb; + /** + * A journal entry completion callback argument. + */ + void *on_done_cb_data; /** * Approximate size of this request when encoded. */ @@ -80,7 +96,18 @@ struct region; * @return NULL if out of memory, fiber diagnostics area is set */ struct journal_entry * -journal_entry_new(size_t n_rows, struct region *region); +journal_entry_new(size_t n_rows, struct region *region, + journal_entry_done_cb on_done_cb, + void *on_done_cb_data); + +/** + * Finalize a single entry. + */ +static inline void +journal_entry_complete(struct journal_entry *entry) +{ + entry->on_done_cb(entry, entry->on_done_cb_data); +} /** * An API for an abstract journal for all transactions of this diff --git a/src/box/txn.c b/src/box/txn.c index 1eb4db6a3a584954eff4e7aa18e450c239f3ce71..59d8966613895655ae717243e9cce81feed2f0d1 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -337,6 +337,64 @@ txn_commit_stmt(struct txn *txn, struct request *request) return -1; } +/* + * A helper function to process on_commit/on_rollback triggers. + */ +static inline void +txn_run_triggers(struct txn *txn, struct rlist *trigger) +{ + /* + * Some triggers require for in_txn variable to be set so + * restore it for the time triggers are in progress. + */ + fiber_set_txn(fiber(), txn); + /* Rollback triggers must not throw. */ + if (trigger_run(trigger, txn) != 0) { + /* + * As transaction couldn't handle a trigger error so + * there is no option except panic. + */ + diag_log(); + unreachable(); + panic("commit/rollback trigger failed"); + } + fiber_set_txn(fiber(), NULL); +} + +/** + * Complete transaction processing. + */ +static void +txn_complete(struct txn *txn) +{ + /* + * Note, engine can be NULL if transaction contains + * IPROTO_NOP statements only. + */ + if (txn->signature < 0) { + /* Undo the transaction. */ + if (txn->engine) + engine_rollback(txn->engine, txn); + if (txn->has_triggers) + txn_run_triggers(txn, &txn->on_rollback); + } else { + /* Commit the transaction. */ + if (txn->engine != NULL) + engine_commit(txn->engine, txn); + if (txn->has_triggers) + txn_run_triggers(txn, &txn->on_commit); + } +} + +static void +txn_entry_done_cb(struct journal_entry *entry, void *data) +{ + struct txn *txn = data; + txn->signature = entry->res; + txn_complete(txn); +} + + static int64_t txn_write_to_wal(struct txn *txn) { @@ -344,7 +402,9 @@ txn_write_to_wal(struct txn *txn) struct journal_entry *req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows, - &txn->region); + &txn->region, + txn_entry_done_cb, + txn); if (req == NULL) { txn_rollback(txn); return -1; @@ -370,16 +430,14 @@ txn_write_to_wal(struct txn *txn) ev_tstamp stop = ev_monotonic_now(loop()); if (res < 0) { - /* Cascading rollback. */ - txn_rollback(txn); /* Perform our part of cascading rollback. */ - /* - * Move fiber to end of event loop to avoid - * execution of any new requests before all - * pending rollbacks are processed. - */ - fiber_reschedule(); diag_set(ClientError, ER_WAL_IO); diag_log(); + /* + * Despite the fact that the transaction was rolled back + * by the journal completion callback, it's our duty to + * free it. + */ + txn_free(txn); } else if (stop - start > too_long_threshold) { int n_rows = txn->n_new_rows + txn->n_applier_rows; say_warn_ratelimited("too long WAL write: %d rows at " @@ -418,30 +476,23 @@ txn_commit(struct txn *txn) } trigger_clear(&txn->fiber_on_stop); + /* + * After this point the transaction must not be used + * so reset the corresponding key in the fiber storage. + */ + fiber_set_txn(fiber(), NULL); if (txn->n_new_rows + txn->n_applier_rows > 0) { txn->signature = txn_write_to_wal(txn); if (txn->signature < 0) return -1; + } else { + /* + * Even though there's nothing to write to WAL, + * we still must complete the transaction. + */ + txn->signature = 0; + txn_complete(txn); } - /* - * Engine can be NULL if transaction contains IPROTO_NOP - * statements only. - */ - if (txn->engine != NULL) - engine_commit(txn->engine, txn); - /* - * The transaction is in the binary log. No action below - * may throw. In case an error has happened, there is - * no other option but terminate. - */ - if (txn->has_triggers && - trigger_run(&txn->on_commit, txn) != 0) { - diag_log(); - unreachable(); - panic("commit trigger failed"); - } - - fiber_set_txn(fiber(), NULL); txn_free(txn); return 0; fail: @@ -463,18 +514,10 @@ txn_rollback(struct txn *txn) { assert(txn == in_txn()); trigger_clear(&txn->fiber_on_stop); - if (txn->engine) - engine_rollback(txn->engine, txn); - /* Rollback triggers must not throw. */ - if (txn->has_triggers && - trigger_run(&txn->on_rollback, txn) != 0) { - diag_log(); - unreachable(); - panic("rollback trigger failed"); - } - - fiber_set_txn(fiber(), NULL); + txn->signature = -1; + txn_complete(txn); txn_free(txn); + fiber_set_txn(fiber(), NULL); } void diff --git a/src/box/vy_log.c b/src/box/vy_log.c index 098a01419eaa56a2304290eb9ce58fb93f86578b..bf50f5520f26f172a0c914d14d711a12d1608d1e 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -819,7 +819,8 @@ vy_log_tx_flush(struct vy_log_tx *tx) tx_size++; size_t used = region_used(&fiber()->gc); - struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc); + struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc, + NULL, NULL); if (entry == NULL) goto err; diff --git a/src/box/wal.c b/src/box/wal.c index 0ea15a432a4573f66774c54d35fdcd07955eca0f..4fa9beca0a5d7d06c835569380284cfed2cba0ef 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -260,8 +260,10 @@ tx_schedule_queue(struct stailq *queue) * are many ready fibers. */ struct journal_entry *req; - stailq_foreach_entry(req, queue, fifo) + stailq_foreach_entry(req, queue, fifo) { + journal_entry_complete(req); fiber_wakeup(req->fiber); + } } /** @@ -1131,7 +1133,9 @@ wal_write(struct journal *journal, struct journal_entry *entry) { struct wal_writer *writer = (struct wal_writer *) journal; - ERROR_INJECT_RETURN(ERRINJ_WAL_IO); + ERROR_INJECT(ERRINJ_WAL_IO, { + goto fail; + }); if (! stailq_empty(&writer->rollback)) { /* @@ -1144,7 +1148,7 @@ wal_write(struct journal *journal, struct journal_entry *entry) say_error("Aborting transaction %llu during " "cascading rollback", vclock_sum(&writer->vclock)); - return -1; + goto fail; } struct wal_msg *batch; @@ -1158,7 +1162,7 @@ wal_write(struct journal *journal, struct journal_entry *entry) if (batch == NULL) { diag_set(OutOfMemory, sizeof(struct wal_msg), "region", "struct wal_msg"); - return -1; + goto fail; } wal_msg_create(batch); /* @@ -1182,6 +1186,11 @@ wal_write(struct journal *journal, struct journal_entry *entry) fiber_yield(); /* Request was inserted. */ fiber_set_cancellable(cancellable); return entry->res; + +fail: + entry->res = -1; + journal_entry_complete(entry); + return -1; } int64_t @@ -1195,7 +1204,9 @@ wal_write_in_wal_mode_none(struct journal *journal, entry->rows + entry->n_rows); vclock_merge(&writer->vclock, &vclock_diff); vclock_copy(&replicaset.vclock, &writer->vclock); - return vclock_sum(&writer->vclock); + entry->res = vclock_sum(&writer->vclock); + journal_entry_complete(entry); + return entry->res; } void diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h index 49c59cf63ac60bbf89c266afaeba61af9e528ef6..58094256423c567e514b2cca11a292308312d2e0 100644 --- a/src/lib/core/latch.h +++ b/src/lib/core/latch.h @@ -155,6 +155,16 @@ latch_trylock(struct latch *l) return latch_lock_timeout(l, 0); } +/** + * Take a latch ownership + */ +static inline void +latch_steal(struct latch *l) +{ + assert(l->owner != NULL); + l->owner = fiber(); +} + /** * \copydoc box_latch_unlock */