diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 530820e839c657148db30dd9552292cf39be81f8..a2e2a447436e7e08302538be8d4e27f013ea54d8 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1253,7 +1253,48 @@ vy_is_committed(struct vy_env *env, struct space *space) } /** - * Get a vinyl tuple from the LSM tree by the key. + * Get a full tuple by a tuple read from a secondary index. + * @param lsm LSM tree from which the tuple was read. + * @param tx Current transaction. + * @param rv Read view. + * @param tuple Tuple read from a secondary index. + * @param[out] result The found tuple is stored here. Must be + * unreferenced after usage. + * + * @param 0 Success. + * @param -1 Memory error or read error. + */ +static int +vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx, + const struct vy_read_view **rv, + struct tuple *tuple, struct tuple **result) +{ + assert(lsm->index_id > 0); + /* + * No need in vy_tx_track() as the tuple must already be + * tracked in the secondary index LSM tree. + */ + if (vy_point_lookup(lsm->pk, tx, rv, tuple, result) != 0) + return -1; + + if (*result == NULL) { + /* + * All indexes of a space must be consistent, i.e. + * if a tuple is present in one index, it must be + * present in all other indexes as well, so we can + * get here only if there's a bug somewhere in vinyl. + * Don't abort as core dump won't really help us in + * this case. Just warn the user and proceed to the + * next tuple. + */ + say_warn("%s: key %s missing in primary index", + vy_lsm_name(lsm), vy_stmt_str(tuple)); + } + return 0; +} + +/** + * Get a tuple from a vinyl space by key. * @param lsm LSM tree in which search. * @param tx Current transaction. * @param rv Read view. @@ -1264,10 +1305,10 @@ vy_is_committed(struct vy_env *env, struct space *space) * @param 0 Success. * @param -1 Memory error or read error. */ -static inline int -vy_lsm_get(struct vy_lsm *lsm, struct vy_tx *tx, - const struct vy_read_view **rv, - struct tuple *key, struct tuple **result) +static int +vy_get(struct vy_lsm *lsm, struct vy_tx *tx, + const struct vy_read_view **rv, + struct tuple *key, struct tuple **result) { /* * tx can be NULL, for example, if an user calls @@ -1275,21 +1316,74 @@ vy_lsm_get(struct vy_lsm *lsm, struct vy_tx *tx, */ assert(tx == NULL || tx->state == VINYL_TX_READY); + int rc; + struct tuple *tuple; + if (tuple_field_count(key) >= lsm->cmp_def->part_count) { + /* + * Use point lookup for a full key. + */ if (tx != NULL && vy_tx_track_point(tx, lsm, key) != 0) return -1; - return vy_point_lookup(lsm, tx, rv, key, result); + if (vy_point_lookup(lsm, tx, rv, key, &tuple) != 0) + return -1; + if (lsm->index_id > 0 && tuple != NULL) { + rc = vy_get_by_secondary_tuple(lsm, tx, rv, + tuple, result); + tuple_unref(tuple); + if (rc != 0) + return -1; + } else { + *result = tuple; + } + return 0; } struct vy_read_iterator itr; vy_read_iterator_open(&itr, lsm, tx, ITER_EQ, key, rv); - int rc = vy_read_iterator_next(&itr, result); - if (*result != NULL) - tuple_ref(*result); + while ((rc = vy_read_iterator_next(&itr, &tuple)) == 0) { + if (lsm->index_id == 0 || tuple == NULL) { + *result = tuple; + if (tuple != NULL) + tuple_ref(tuple); + break; + } + rc = vy_get_by_secondary_tuple(lsm, tx, rv, tuple, result); + if (rc != 0 || *result != NULL) + break; + } vy_read_iterator_close(&itr); return rc; } +/** + * Get a tuple from a vinyl space by raw key. + * @param lsm LSM tree in which search. + * @param tx Current transaction. + * @param rv Read view. + * @param key_raw MsgPack array of key fields. + * @param part_count Count of parts in the key. + * @param[out] result The found tuple is stored here. Must be + * unreferenced after usage. + * + * @param 0 Success. + * @param -1 Memory error or read error. + */ +static int +vy_get_by_raw_key(struct vy_lsm *lsm, struct vy_tx *tx, + const struct vy_read_view **rv, + const char *key_raw, uint32_t part_count, + struct tuple **result) +{ + struct tuple *key = vy_stmt_new_select(lsm->env->key_format, + key_raw, part_count); + if (key == NULL) + return -1; + int rc = vy_get(lsm, tx, rv, key, result); + tuple_unref(key); + return rc; +} + /** * Check if the LSM tree contains the key. If true, then set * a duplicate key error in the diagnostics area. @@ -1317,7 +1411,7 @@ vy_check_is_unique(struct vy_env *env, struct vy_tx *tx, */ if (env->status != VINYL_ONLINE) return 0; - if (vy_lsm_get(lsm, tx, rv, key, &found)) + if (vy_get(lsm, tx, rv, key, &found)) return -1; if (found) { @@ -1468,8 +1562,8 @@ vy_replace_one(struct vy_env *env, struct vy_tx *tx, struct space *space, * old tuple to pass it to the trigger. */ if (stmt != NULL && !rlist_empty(&space->on_replace)) { - if (vy_lsm_get(pk, tx, vy_tx_read_view(tx), - new_tuple, &stmt->old_tuple) != 0) + if (vy_get(pk, tx, vy_tx_read_view(tx), + new_tuple, &stmt->old_tuple) != 0) goto error_unref; } if (vy_tx_set(tx, pk, new_tuple)) @@ -1524,8 +1618,7 @@ vy_replace_impl(struct vy_env *env, struct vy_tx *tx, struct space *space, return -1; /* Get full tuple from the primary index. */ - if (vy_lsm_get(pk, tx, vy_tx_read_view(tx), - new_stmt, &old_stmt) != 0) + if (vy_get(pk, tx, vy_tx_read_view(tx), new_stmt, &old_stmt) != 0) goto error; if (old_stmt == NULL) { @@ -1623,51 +1716,6 @@ vy_unique_key_validate(struct vy_lsm *lsm, const char *key, return key_validate_parts(lsm->cmp_def, key, part_count, false); } -/** - * Find a tuple in the primary index LSM tree by the key of the - * specified LSM tree. - * @param lsm LSM tree for which the key is specified. - * Can be both primary and secondary. - * @param tx Current transaction. - * @param rv Read view. - * @param key_raw MessagePack'ed data, the array without a - * header. - * @param part_count Count of parts in the key. - * @param[out] result The found statement is stored here. Must be - * unreferenced after usage. - * - * @retval 0 Success. - * @retval -1 Memory error. - */ -static inline int -vy_lsm_full_by_key(struct vy_lsm *lsm, struct vy_tx *tx, - const struct vy_read_view **rv, - const char *key_raw, uint32_t part_count, - struct tuple **result) -{ - int rc; - struct tuple *key = vy_stmt_new_select(lsm->env->key_format, - key_raw, part_count); - if (key == NULL) - return -1; - struct tuple *found; - rc = vy_lsm_get(lsm, tx, rv, key, &found); - tuple_unref(key); - if (rc != 0) - return -1; - if (lsm->index_id == 0 || found == NULL) { - *result = found; - return 0; - } - /* - * No need in vy_tx_track() as the tuple is already - * tracked in the secondary index LSM tree. - */ - rc = vy_point_lookup(lsm->pk, tx, rv, found, result); - tuple_unref(found); - return rc; -} - /** * Delete the tuple from all LSM trees of the vinyl space. * @param env Vinyl environment. @@ -1750,8 +1798,8 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, * and pass them to indexes for deletion. */ if (has_secondary || !rlist_empty(&space->on_replace)) { - if (vy_lsm_full_by_key(lsm, tx, vy_tx_read_view(tx), - key, part_count, &stmt->old_tuple) != 0) + if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx), + key, part_count, &stmt->old_tuple) != 0) return -1; if (stmt->old_tuple == NULL) return 0; @@ -1832,8 +1880,8 @@ vy_update(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, if (vy_unique_key_validate(lsm, key, part_count)) return -1; - if (vy_lsm_full_by_key(lsm, tx, vy_tx_read_view(tx), - key, part_count, &stmt->old_tuple) != 0) + if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx), + key, part_count, &stmt->old_tuple) != 0) return -1; /* Nothing to update. */ if (stmt->old_tuple == NULL) @@ -2106,8 +2154,7 @@ vy_upsert(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, pk->key_def, pk->env->key_format); if (key == NULL) return -1; - int rc = vy_lsm_get(pk, tx, vy_tx_read_view(tx), - key, &stmt->old_tuple); + int rc = vy_get(pk, tx, vy_tx_read_view(tx), key, &stmt->old_tuple); tuple_unref(key); if (rc != 0) return -1; @@ -3809,28 +3856,13 @@ vinyl_iterator_secondary_next(struct iterator *base, struct tuple **ret) fiber_sleep(0.01); } #endif - /* - * Get the full tuple from the primary index. - * Note, there's no need in vy_tx_track() as the - * tuple is already tracked in the secondary index. - */ - if (vy_point_lookup(it->lsm->pk, it->tx, vy_tx_read_view(it->tx), - tuple, ret) != 0) + /* Get the full tuple from the primary index. */ + if (vy_get_by_secondary_tuple(it->lsm, it->tx, + vy_tx_read_view(it->tx), + tuple, ret) != 0) goto fail; - if (*ret == NULL) { - /* - * All indexes of a space must be consistent, i.e. - * if a tuple is present in one index, it must be - * present in all other indexes as well, so we can - * get here only if there's a bug somewhere in vinyl. - * Don't abort as core dump won't really help us in - * this case. Just warn the user and proceed to the - * next tuple. - */ - say_warn("%s: key %s missing in primary index", - vy_lsm_name(it->lsm), vy_stmt_str(tuple)); + if (*ret == NULL) goto next; - } tuple_bless(*ret); tuple_unref(*ret); return 0; @@ -3919,7 +3951,7 @@ vinyl_index_get(struct index *index, const char *key, const struct vy_read_view **rv = (tx != NULL ? vy_tx_read_view(tx) : &env->xm->p_global_read_view); - if (vy_lsm_full_by_key(lsm, tx, rv, key, part_count, ret) != 0) + if (vy_get_by_raw_key(lsm, tx, rv, key, part_count, ret) != 0) return -1; if (*ret != NULL) { tuple_bless(*ret);