diff --git a/changelogs/unreleased/gh-9917-speed-up-transaction-queue-processing.md b/changelogs/unreleased/gh-9917-speed-up-transaction-queue-processing.md new file mode 100644 index 0000000000000000000000000000000000000000..4bc8f67d21b5f1be7c193ec2a9d55a21046d633c --- /dev/null +++ b/changelogs/unreleased/gh-9917-speed-up-transaction-queue-processing.md @@ -0,0 +1,3 @@ +## bugfix/limbo + +* Optimized synchronous transaction queue processing (gh-9917). diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index 46157af1fc9bf3d149c710d3633ab1696ae84658..8a9945bf5dacfd2ce96f1efdedbe446602d5c1d1 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -53,6 +53,7 @@ txn_limbo_create(struct txn_limbo *limbo) limbo->promote_greatest_term = 0; latch_create(&limbo->promote_latch); limbo->confirmed_lsn = 0; + limbo->entry_to_confirm = NULL; limbo->rollback_count = 0; limbo->is_in_rollback = false; limbo->svp_confirmed_lsn = -1; @@ -161,13 +162,17 @@ txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn) size_t size; struct txn_limbo_entry *e = region_alloc_object(&txn->region, typeof(*e), &size); + if (limbo->entry_to_confirm == NULL && + txn_has_flag(txn, TXN_WAIT_ACK)) { + limbo->entry_to_confirm = e; + limbo->ack_count = 0; + } if (e == NULL) { diag_set(OutOfMemory, size, "region_alloc_object", "e"); return NULL; } e->txn = txn; e->lsn = -1; - e->ack_count = 0; e->is_commit = false; e->is_rollback = false; rlist_add_tail_entry(&limbo->queue, e, in_queue); @@ -200,6 +205,8 @@ void txn_limbo_abort(struct txn_limbo *limbo, struct txn_limbo_entry *entry) { entry->is_rollback = true; + if (entry == limbo->entry_to_confirm) + limbo->entry_to_confirm = NULL; /* * The simple rule about rollback/commit order applies * here as well: commit always in the order of WAL write, @@ -228,6 +235,9 @@ txn_limbo_assign_remote_lsn(struct txn_limbo *limbo, entry->lsn = lsn; } +static void +txn_limbo_confirm(struct txn_limbo *limbo); + void txn_limbo_assign_local_lsn(struct txn_limbo *limbo, struct txn_limbo_entry *entry, int64_t lsn) @@ -238,19 +248,8 @@ txn_limbo_assign_local_lsn(struct txn_limbo *limbo, assert(lsn > 0); entry->lsn = lsn; - /* - * The entry just got its LSN after a WAL write. It could - * happen that this LSN was already ACKed by some - * replicas. Update the ACK counter to take them into - * account. - */ - struct vclock_iterator iter; - vclock_iterator_init(&iter, &limbo->vclock); - int ack_count = 0; - vclock_foreach(&iter, vc) - ack_count += vc.lsn >= lsn; - assert(ack_count >= entry->ack_count); - entry->ack_count = ack_count; + if (entry == limbo->entry_to_confirm) + limbo->ack_count = vclock_count_ge(&limbo->vclock, entry->lsn); } void @@ -593,6 +592,7 @@ txn_limbo_read_promote(struct txn_limbo *limbo, uint32_t replica_id, limbo->owner_id = replica_id; limbo->confirmed_lsn = vclock_get(&limbo->confirmed_vclock, replica_id); + limbo->entry_to_confirm = NULL; box_update_ro_summary(); } @@ -629,6 +629,56 @@ txn_limbo_read_demote(struct txn_limbo *limbo, int64_t lsn) return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, lsn); } +/** + * Check that some synchronous transactions have gathered quorum and + * write a confirmation entry of the last confirmed transaction. + */ +static void +txn_limbo_confirm(struct txn_limbo *limbo) +{ + assert(limbo->owner_id == instance_id); + if (limbo->is_in_rollback) + return; + if (limbo->entry_to_confirm == NULL || + limbo->entry_to_confirm->lsn == -1) + return; + if (limbo->ack_count < replication_synchro_quorum) + return; + int32_t k = (int32_t)vclock_size(&limbo->vclock) + - replication_synchro_quorum; + /** + * limbo->ack_count >= replication_synchro_quorum => + * vclock_size(&limbo->vclock) >= replication_synchro_quorum + */ + assert(k >= 0); + int64_t confirm_lsn = vclock_nth_element(&limbo->vclock, k); + assert(confirm_lsn >= limbo->entry_to_confirm->lsn); + struct txn_limbo_entry *e = limbo->entry_to_confirm; + limbo->entry_to_confirm = NULL; + int64_t max_assigned_lsn = -1; + for (; !rlist_entry_is_head(e, &limbo->queue, in_queue); + e = rlist_next_entry(e, in_queue)) { + if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) + continue; + if (e->lsn == -1 || e->lsn > confirm_lsn) { + limbo->entry_to_confirm = e; + /** + * It may be that a quorum has been gathered, but + * ack_count = 0. It's ok. CONFIRM will be written as + * soon as the lsn is assigned to the transaction. + */ + limbo->ack_count = (e->lsn == -1) ? 0 : + vclock_count_ge(&limbo->vclock, e->lsn); + break; + } else { + max_assigned_lsn = e->lsn; + } + } + assert(max_assigned_lsn != -1); + txn_limbo_write_confirm(limbo, max_assigned_lsn); + txn_limbo_read_confirm(limbo, max_assigned_lsn); +} + void txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) { @@ -651,9 +701,10 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) return; assert(limbo->owner_id != REPLICA_ID_NIL); int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); + assert(lsn >= prev_lsn); /* * One of the reasons why can happen - the remote instance is not - * read-only and wrote something under its own insance_id. For qsync + * read-only and wrote something under its own instance_id. For qsync * that most likely means that the remote instance decided to take over * the limbo ownership, and the current node is going to become a * replica very soon. @@ -661,31 +712,15 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) if (lsn == prev_lsn) return; vclock_follow(&limbo->vclock, replica_id, lsn); - struct txn_limbo_entry *e; - int64_t confirm_lsn = -1; - rlist_foreach_entry(e, &limbo->queue, in_queue) { - assert(e->ack_count <= VCLOCK_MAX); - if (e->lsn > lsn) - break; - /* - * Sync transactions need to collect acks. Async - * transactions are automatically committed right - * after all the previous sync transactions are. - */ - if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { - continue; - } else if (e->lsn <= prev_lsn) { - continue; - } else if (++e->ack_count < replication_synchro_quorum) { - continue; - } else { - confirm_lsn = e->lsn; - } + + if (limbo->entry_to_confirm != NULL && + limbo->entry_to_confirm->lsn != -1) { + if (limbo->entry_to_confirm->lsn <= prev_lsn || + lsn < limbo->entry_to_confirm->lsn) + return; + if (++limbo->ack_count >= replication_synchro_quorum) + txn_limbo_confirm(limbo); } - if (confirm_lsn == -1 || confirm_lsn <= limbo->confirmed_lsn) - return; - txn_limbo_write_confirm(limbo, confirm_lsn); - txn_limbo_read_confirm(limbo, confirm_lsn); } /** @@ -1225,23 +1260,9 @@ txn_limbo_on_parameters_change(struct txn_limbo *limbo) { if (rlist_empty(&limbo->queue) || txn_limbo_is_frozen(limbo)) return; - struct txn_limbo_entry *e; - int64_t confirm_lsn = -1; - rlist_foreach_entry(e, &limbo->queue, in_queue) { - assert(e->ack_count <= VCLOCK_MAX); - if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { - continue; - } else if (e->ack_count < replication_synchro_quorum) { - continue; - } else { - confirm_lsn = e->lsn; - assert(confirm_lsn > 0); - } - } - if (confirm_lsn > limbo->confirmed_lsn && !limbo->is_in_rollback) { - txn_limbo_write_confirm(limbo, confirm_lsn); - txn_limbo_read_confirm(limbo, confirm_lsn); - } + /* The replication_synchro_quorum value may have changed. */ + if (limbo->owner_id == instance_id) + txn_limbo_confirm(limbo); /* * Wakeup all the others - timed out will rollback. Also * there can be non-transactional waiters, such as CONFIRM diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index ce93c9d10c100476addf324a474d81ae07423085..0da59d176b00c26b38980f4dca79d8f7a0fde815 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -57,11 +57,6 @@ struct txn_limbo_entry { * written to WAL yet. */ int64_t lsn; - /** - * Number of ACKs. Or in other words - how many replicas - * confirmed receipt of the transaction. - */ - int ack_count; /** * Result flags. Only one of them can be true. But both * can be false if the transaction is still waiting for @@ -166,6 +161,19 @@ struct txn_limbo { * illegal. */ int64_t confirmed_lsn; + /** + * The first unconfirmed synchronous transaction in the current term. + * Is NULL if there is no such transaction, or if the current instance + * does not own limbo. + */ + struct txn_limbo_entry *entry_to_confirm; + /** + * Number of ACKs of the first unconfirmed synchronous transaction + * (entry_to_confirm->txn). Contains the actual value only for a + * non-NULL entry_to_confirm with a local lsn assigned. Otherwise + * it may contain any trash. + */ + int ack_count; /** * Total number of performed rollbacks. It used as a guard * to do some actions assuming all limbo transactions will