diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 3ef86da352aabefa30bf5ea026b62213c6e80afa..4651b8bbbe97d3e61e5945e499d412fe184b8697 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -371,7 +371,12 @@ vy_file_seek(struct vy_file *f, uint64_t off) } struct vy_buf { - char *s, *p, *e; + /** Start of the allocated buffer */ + char *s; + /** End of the used area */ + char *p; + /** End of the buffer */ + char *e; }; static inline void @@ -3234,8 +3239,8 @@ static struct svif svtuple_if = }; struct sxv { - uint64_t id; - uint32_t lo; + uint64_t tsn; + int log_read; uint64_t csn; struct tx_index *index; struct vinyl_tuple *tuple; @@ -3252,8 +3257,8 @@ sxv_alloc(struct vinyl_tuple *tuple) if (unlikely(v == NULL)) return NULL; v->index = NULL; - v->id = 0; - v->lo = 0; + v->tsn = 0; + v->log_read = 0; v->csn = 0; v->tuple = tuple; v->next = NULL; @@ -3280,10 +3285,10 @@ sxv_freeall(struct runtime *r, struct sxv *v) } static inline struct sxv * -sxv_match(struct sxv *head, uint64_t id) +sxv_match(struct sxv *head, uint64_t tsn) { for (struct sxv *c = head; c != NULL; c = c->next) - if (c->id == id) + if (c->tsn == tsn) return c; return NULL; } @@ -3324,15 +3329,15 @@ sxv_unlink(struct sxv *v) static inline void sxv_commit(struct sxv *v, uint32_t csn) { - v->id = UINT64_MAX; - v->lo = UINT32_MAX; + v->tsn = UINT64_MAX; + v->log_read = INT_MAX; v->csn = csn; } static inline int sxv_committed(struct sxv *v) { - return v->id == UINT64_MAX && v->lo == UINT32_MAX; + return v->tsn == UINT64_MAX && v->log_read == INT_MAX; } static inline void @@ -3432,7 +3437,7 @@ struct vinyl_tx { uint64_t start; enum tx_type type; enum tx_state state; - uint64_t id; + uint64_t tsn; /** * Consistent read view LSN: the LSN recorded * at start of transaction and used to implement @@ -3440,7 +3445,7 @@ struct vinyl_tx { */ uint64_t vlsn; uint64_t csn; - int log_read; + int log_read; struct rlist deadlock; rb_node(struct vinyl_tx) tree_node; struct tx_manager *manager; @@ -3452,14 +3457,14 @@ static int tx_tree_cmp(tx_tree_t *rbtree, struct vinyl_tx *a, struct vinyl_tx *b) { (void)rbtree; - return vy_cmp(a->id, b->id); + return vy_cmp(a->tsn, b->tsn); } static int tx_tree_key_cmp(tx_tree_t *rbtree, const char *a, struct vinyl_tx *b) { (void)rbtree; - return vy_cmp(load_u64(a), b->id); + return vy_cmp(load_u64(a), b->tsn); } rb_gen_ext_key(, tx_tree_, tx_tree_t, struct vinyl_tx, tree_node, @@ -3559,26 +3564,26 @@ static uint64_t tx_manager_min(struct tx_manager *m) { tt_pthread_mutex_lock(&m->lock); - uint64_t id = 0; + uint64_t min_tsn = 0; if (tx_manager_count(m) > 0) { struct vinyl_tx *min = tx_tree_first(&m->tree); - id = min->id; + min_tsn = min->tsn; } tt_pthread_mutex_unlock(&m->lock); - return id; + return min_tsn; } static uint64_t tx_manager_max(struct tx_manager *m) { tt_pthread_mutex_lock(&m->lock); - uint64_t id = 0; + uint64_t max_tsn = 0; if (tx_manager_count(m) > 0) { struct vinyl_tx *max = tx_tree_last(&m->tree); - id = max->id; + max_tsn = max->tsn; } tt_pthread_mutex_unlock(&m->lock); - return id; + return max_tsn; } static uint64_t @@ -3622,7 +3627,7 @@ tx_begin(struct tx_manager *m, struct vinyl_tx *tx, enum tx_type type) vy_sequence_lock(m->r->seq); tx->csn = m->csn; - tx->id = vy_sequence_do(m->r->seq, VINYL_TSN_NEXT); + tx->tsn = vy_sequence_do(m->r->seq, VINYL_TSN_NEXT); tx->vlsn = vy_sequence_do(m->r->seq, VINYL_LSN); vy_sequence_unlock(m->r->seq); @@ -3784,7 +3789,7 @@ tx_prepare(struct vinyl_tx *tx, struct sicache *cache, enum vinyl_status status) { struct svlogv *lv = vy_bufiter_get(&i); struct sxv *v = lv->v.v; - if ((int)v->lo == tx->log_read) + if (v->log_read == tx->log_read) break; if (sxv_aborted(v)) return tx_promote(tx, VINYL_TX_ROLLBACK); @@ -3810,12 +3815,13 @@ tx_prepare(struct vinyl_tx *tx, struct sicache *cache, enum vinyl_status status) } static int -tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) +tx_set(struct vinyl_tx *tx, struct tx_index *index, + struct vinyl_tuple *version) { - struct tx_manager *m = x->manager; + struct tx_manager *m = tx->manager; struct runtime *r = m->r; if (! (version->flags & SVGET)) { - x->log_read = -1; + tx->log_read = -1; } /* allocate mvcc container */ struct sxv *v = sxv_alloc(version); @@ -3824,7 +3830,7 @@ tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) vinyl_tuple_unref_rt(r, version); return -1; } - v->id = x->id; + v->tsn = tx->tsn; v->index = index; struct svlogv lv; lv.space_id = index->key_def->space_id; @@ -3836,8 +3842,8 @@ tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) v->tuple->data, v->tuple->size); if (unlikely(head == NULL)) { /* unique */ - v->lo = sv_logcount(&x->log); - int rc = sv_logadd(&x->log, &lv, index->index); + v->log_read = sv_logcount(&tx->log); + int rc = sv_logadd(&tx->log, &lv, index->index); if (unlikely(rc == -1)) { vy_oom(); goto error; @@ -3848,7 +3854,7 @@ tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) } /* exists */ /* match previous update made by current transaction */ - struct sxv *own = sxv_match(head, x->id); + struct sxv *own = sxv_match(head, tx->tsn); if (unlikely(own)) { if (unlikely(version->flags & SVUPSERT)) { @@ -3857,8 +3863,8 @@ tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) goto error; } /* replace old document with the new one */ - lv.next = sv_logat(&x->log, own->lo)->next; - v->lo = own->lo; + lv.next = sv_logat(&tx->log, own->log_read)->next; + v->log_read = own->log_read; if (unlikely(sxv_aborted(own))) sxv_abort(v); sxv_replace(own, v); @@ -3867,7 +3873,7 @@ tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) sxv_tree_insert(&index->tree, v); } /* update log */ - sv_logreplace(&x->log, v->lo, &lv); + sv_logreplace(&tx->log, v->log_read, &lv); vy_quota_op(r->quota, VINYL_QREMOVE, vinyl_tuple_size(own->tuple)); sxv_free(r, own); @@ -3875,8 +3881,8 @@ tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) return 0; } /* update log */ - v->lo = sv_logcount(&x->log); - int rc = sv_logadd(&x->log, &lv, index->index); + v->log_read = sv_logcount(&tx->log); + int rc = sv_logadd(&tx->log, &lv, index->index); if (unlikely(rc == -1)) { vy_oom(); goto error; @@ -3893,7 +3899,7 @@ tx_set(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *version) } static int -tx_get(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *key, +tx_get(struct vinyl_tx *tx, struct tx_index *index, struct vinyl_tuple *key, struct vinyl_tuple **result) { tt_pthread_mutex_lock(&index->mutex); @@ -3901,7 +3907,7 @@ tx_get(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *key, key->data, key->size); if (head == NULL) goto add; - struct sxv *v = sxv_match(head, x->id); + struct sxv *v = sxv_match(head, tx->tsn); if (v == NULL) goto add; tt_pthread_mutex_unlock(&index->mutex); @@ -3917,11 +3923,11 @@ tx_get(struct vinyl_tx *x, struct tx_index *index, struct vinyl_tuple *key, add: /* track a start of the latest read sequence in the * transactional log */ - if (x->log_read == -1) - x->log_read = sv_logcount(&x->log); + if (tx->log_read == -1) + tx->log_read = sv_logcount(&tx->log); tt_pthread_mutex_unlock(&index->mutex); vinyl_tuple_ref(key); - int rc = tx_set(x, index, key); + int rc = tx_set(tx, index, key); if (unlikely(rc == -1)) return -1; return 0; @@ -3943,7 +3949,7 @@ tx_deadlock_in(struct tx_manager *m, struct rlist *mark, struct vinyl_tx *t, if (v->prev == NULL) continue; do { - struct vinyl_tx *n = tx_manager_find(m, v->id); + struct vinyl_tx *n = tx_manager_find(m, v->tsn); assert(n != NULL); if (unlikely(n == t)) return 1; @@ -3980,7 +3986,7 @@ static int __attribute__((unused)) tx_deadlock(struct vinyl_tx *t) vy_bufiter_next(&i); continue; } - struct vinyl_tx *p = tx_manager_find(m, v->prev->id); + struct vinyl_tx *p = tx_manager_find(m, v->prev->tsn); assert(p != NULL); int rc = tx_deadlock_in(m, &mark, t, p); if (unlikely(rc)) { @@ -5502,8 +5508,8 @@ struct vinyl_index { struct vinyl_env *env; struct vy_profiler rtp; struct tx_index coindex; - uint64_t txn_min; - uint64_t txn_max; + uint64_t tsn_min; + uint64_t tsn_max; vy_range_tree_t tree; struct vy_status status; @@ -10161,7 +10167,7 @@ vinyl_index_close(struct vinyl_index *index) if (unlikely(! vy_status_is_active(status))) return -1; /* set last visible transaction id */ - index->txn_max = tx_manager_max(&e->xm); + index->tsn_max = tx_manager_max(&e->xm); vy_status_set(&index->status, VINYL_SHUTDOWN_PENDING); if (e->status == VINYL_SHUTDOWN || e->status == VINYL_OFFLINE) { return vinyl_index_delete(index); @@ -10181,7 +10187,7 @@ vinyl_index_drop(struct vinyl_index *index) if (unlikely(rc == -1)) return -1; /* set last visible transaction id */ - index->txn_max = tx_manager_max(&e->xm); + index->tsn_max = tx_manager_max(&e->xm); vy_status_set(&index->status, VINYL_DROP_PENDING); if (e->status == VINYL_SHUTDOWN || e->status == VINYL_OFFLINE) return vinyl_index_delete(index); @@ -10359,8 +10365,8 @@ vinyl_index_new(struct vinyl_env *e, struct key_def *key_def) index->refs = 1; vy_status_set(&index->status, VINYL_OFFLINE); tx_index_init(&index->coindex, index, index->key_def); - index->txn_min = tx_manager_min(&e->xm); - index->txn_max = UINT32_MAX; + index->tsn_min = tx_manager_min(&e->xm); + index->tsn_max = UINT64_MAX; rlist_add(&e->indexes, &index->link); return index; @@ -10414,9 +10420,9 @@ vinyl_index_by_name(struct vinyl_env *e, const char *name) return NULL; } -static int vinyl_index_visible(struct vinyl_index *index, uint64_t txn) +static int vinyl_index_visible(struct vinyl_index *index, uint64_t tsn) { - return txn > index->txn_min && txn <= index->txn_max; + return tsn > index->tsn_min && tsn <= index->tsn_max; } size_t @@ -10711,7 +10717,7 @@ vy_tx_write(struct vinyl_tx *t, struct vinyl_index *index, switch (status) { case VINYL_SHUTDOWN_PENDING: case VINYL_DROP_PENDING: - if (unlikely(! vinyl_index_visible(index, t->id))) { + if (unlikely(! vinyl_index_visible(index, t->tsn))) { vy_error("%s", "index is invisible for the transaction"); return -1; } @@ -10897,6 +10903,7 @@ vinyl_prepare(struct vinyl_env *e, struct vinyl_tx *tx) /* prepare transaction */ assert(tx->state == VINYL_TX_READY); + struct sicache *cache = vy_cachepool_pop(&e->cachepool); if (unlikely(cache == NULL)) return vy_oom(); @@ -10919,7 +10926,7 @@ vinyl_prepare(struct vinyl_env *e, struct vinyl_tx *tx) { struct svlogv *lv = vy_bufiter_get(&i); struct sxv *v = lv->v.v; - if ((int)v->lo == tx->log_read) + if (v->log_read == tx->log_read) break; /* abort conflict reader */ if (v->prev && !sxv_committed(v->prev)) {