From ca0dac38e27cb9b13051a9d5de2e37668938e9f5 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov.dev@gmail.com> Date: Wed, 5 Jul 2017 15:39:52 +0300 Subject: [PATCH] vinyl: extract quota adjustment from vy_tx and vy_index methods Currently, quota (and some global stats) are adjusted in vy_index_commit_upsert(), vy_tx_prepare(), and vy_tx_commit(), which entails dependency of vy_index and tx_manager on vy_env. Let's move this code to the upper level, i.e. to vy_prepare() and vy_commit(). Needed for #1906 --- src/box/vinyl.c | 109 ++++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 51 deletions(-) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index fd967ce38b..54402a8bba 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -566,6 +566,8 @@ struct vy_tx { * the version increments. */ uint32_t write_set_version; + /** Number of statements in the write set. */ + int write_count; /** * Total size of memory occupied by statements of * the write set. @@ -2096,12 +2098,10 @@ vy_index_commit_upsert(struct vy_index *index, struct vy_mem *mem, } assert(older == NULL || upserted_lsn != vy_stmt_lsn(older)); assert(vy_stmt_type(upserted) == IPROTO_REPLACE); - struct lsregion *allocator = &index->env->allocator; - - size_t mem_used_before = lsregion_used(allocator); const struct tuple *region_stmt = - vy_stmt_dup_lsregion(upserted, allocator, mem->generation); + vy_stmt_dup_lsregion(upserted, mem->allocator, + mem->generation); if (region_stmt == NULL) { /* OOM */ tuple_unref(upserted); @@ -2109,11 +2109,6 @@ vy_index_commit_upsert(struct vy_index *index, struct vy_mem *mem, return; } - size_t mem_used_after = lsregion_used(allocator); - assert(mem_used_after >= mem_used_before); - size_t mem_increment = mem_used_after - mem_used_before; - vy_quota_force_use(&index->env->quota, mem_increment); - int rc = vy_index_set(index, mem, upserted, ®ion_stmt); /** * Since we have already allocated mem statement and @@ -4594,6 +4589,7 @@ vy_tx_set(struct vy_tx *tx, struct vy_index *index, struct tuple *stmt) v->is_gap = false; write_set_insert(&tx->write_set, v); tx->write_set_version++; + tx->write_count++; tx->write_size += tuple_size(stmt); vy_stmt_counter_acct_tuple(&index->stat.txw.count, stmt); stailq_add_tail_entry(&tx->log, v, next_in_log); @@ -5734,7 +5730,6 @@ static int vy_tx_prepare(struct vy_tx *tx) { struct tx_manager *xm = tx->xm; - struct vy_env *env = xm->env; if (vy_tx_is_ro(tx)) { assert(tx->state == VINYL_TX_READY); @@ -5742,20 +5737,7 @@ vy_tx_prepare(struct vy_tx *tx) return 0; } - /* - * Reserve quota needed by the transaction before allocating - * memory. Since this may yield, which opens a time window for - * the transaction to be sent to read view or aborted, we call - * it before checking for conflicts. - */ - if (vy_quota_use(&env->quota, tx->write_size, - env->conf->timeout) != 0) { - diag_set(ClientError, ER_VY_QUOTA_TIMEOUT); - return -1; - } - if (vy_tx_is_in_read_view(tx) || tx->state == VINYL_TX_ABORT) { - vy_quota_release(&env->quota, tx->write_size); xm->stat.conflict++; diag_set(ClientError, ER_TRANSACTION_CONFLICT); return -1; @@ -5770,18 +5752,14 @@ vy_tx_prepare(struct vy_tx *tx) /** Send to read view read/write intersection */ for (struct txv *v = write_set_first(&tx->write_set); v != NULL; v = write_set_next(&tx->write_set, v)) { - if (vy_tx_send_to_read_view(tx, v)) { - vy_quota_release(&env->quota, tx->write_size); + if (vy_tx_send_to_read_view(tx, v)) return -1; - } } /* * Flush transactional changes to the index. * Sic: the loop below must not yield after recovery. */ - size_t mem_used_before = lsregion_used(&env->allocator); - int write_count = 0; /* repsert - REPLACE/UPSERT */ const struct tuple *delete = NULL, *repsert = NULL; MAYBE_UNUSED uint32_t current_space_id = 0; @@ -5815,31 +5793,9 @@ vy_tx_prepare(struct vy_tx *tx) if (rc != 0) break; v->region_stmt = *region_stmt; - write_count++; } - size_t mem_used_after = lsregion_used(&env->allocator); - assert(mem_used_after >= mem_used_before); - size_t write_size = mem_used_after - mem_used_before; - /* - * Insertion of a statement into an in-memory tree can trigger - * an allocation of a new tree block. This should not normally - * result in a noticeable excess of the memory limit, because - * most memory is occupied by statements anyway, but we need to - * adjust the quota accordingly in this case. - * - * The actual allocation size can also be less than reservation - * if a statement is allocated from an lsregion slab allocated - * by a previous transaction. Take this into account, too. - */ - if (write_size >= tx->write_size) - vy_quota_force_use(&env->quota, write_size - tx->write_size); - else - vy_quota_release(&env->quota, tx->write_size - write_size); if (rc != 0) return -1; - rmean_collect(env->stat->rmean, VY_STAT_TX, 1); - rmean_collect(env->stat->rmean, VY_STAT_TX_OPS, write_count); - rmean_collect(env->stat->rmean, VY_STAT_TX_WRITE, write_size); xm->last_prepared_tx = tx; return 0; } @@ -5937,13 +5893,64 @@ vy_begin(struct vy_env *e) int vy_prepare(struct vy_tx *tx) { - return vy_tx_prepare(tx); + struct vy_env *env = tx->xm->env; + + /* + * Reserve quota needed by the transaction before allocating + * memory. Since this may yield, which opens a time window for + * the transaction to be sent to read view or aborted, we call + * it before checking for conflicts. + */ + if (vy_quota_use(&env->quota, tx->write_size, + env->conf->timeout) != 0) { + diag_set(ClientError, ER_VY_QUOTA_TIMEOUT); + return -1; + } + + size_t mem_used_before = lsregion_used(&env->allocator); + + int rc = vy_tx_prepare(tx); + + size_t mem_used_after = lsregion_used(&env->allocator); + assert(mem_used_after >= mem_used_before); + size_t write_size = mem_used_after - mem_used_before; + /* + * Insertion of a statement into an in-memory tree can trigger + * an allocation of a new tree block. This should not normally + * result in a noticeable excess of the memory limit, because + * most memory is occupied by statements anyway, but we need to + * adjust the quota accordingly in this case. + * + * The actual allocation size can also be less than reservation + * if a statement is allocated from an lsregion slab allocated + * by a previous transaction. Take this into account, too. + */ + if (write_size >= tx->write_size) + vy_quota_force_use(&env->quota, write_size - tx->write_size); + else + vy_quota_release(&env->quota, tx->write_size - write_size); + + if (rc != 0) + return -1; + + rmean_collect(env->stat->rmean, VY_STAT_TX, 1); + rmean_collect(env->stat->rmean, VY_STAT_TX_OPS, tx->write_count); + rmean_collect(env->stat->rmean, VY_STAT_TX_WRITE, write_size); + return 0; } void vy_commit(struct vy_tx *tx, int64_t lsn) { + struct vy_env *env = tx->xm->env; + size_t mem_used_before = lsregion_used(&env->allocator); + vy_tx_commit(tx, lsn); + + size_t mem_used_after = lsregion_used(&env->allocator); + assert(mem_used_after >= mem_used_before); + vy_quota_force_use(&env->quota, mem_used_after - mem_used_before); + mempool_free(&tx->xm->tx_mempool, tx); } -- GitLab