diff --git a/extra/dist/tarantoolctl.in b/extra/dist/tarantoolctl.in index edda76d2deeda03de317f90547f825f426589e7b..920b60e853a428996827b76c12873224ebfbffdb 100755 --- a/extra/dist/tarantoolctl.in +++ b/extra/dist/tarantoolctl.in @@ -928,6 +928,8 @@ local function rocks() remove = "luarocks.remove", show = "luarocks.show", make = "luarocks.make", + pack = "luarocks.pack", + unpack = "luarocks.unpack", } rawset(_G, 'commands', commands) diff --git a/src/box/txn.c b/src/box/txn.c index a35770cff8f07ff8f08354351f74e53434345e22..617ceb8a2fdcce27edbed3675823cec5a46cb36c 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -411,8 +411,7 @@ txn_abort(struct txn *txn) int txn_check_singlestatement(struct txn *txn, const char *where) { - if (!txn->is_autocommit || - stailq_last(&txn->stmts) != stailq_first(&txn->stmts)) { + if (!txn->is_autocommit || !txn_is_first_statement(txn)) { diag_set(ClientError, ER_UNSUPPORTED, where, "multi-statement transactions"); return -1; diff --git a/src/box/txn.h b/src/box/txn.h index a9f68ed65062b4a2d126303a22c2444a10381794..e74c14d409d4a73a8c29fb32d2f9c3ad9bba7534 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -302,6 +302,17 @@ txn_rollback_stmt(); int txn_check_singlestatement(struct txn *txn, const char *where); +/** + * Returns true if the transaction has a single statement. + * Supposed to be used from a space on_replace trigger to + * detect transaction boundaries. + */ +static inline bool +txn_is_first_statement(struct txn *txn) +{ + return stailq_last(&txn->stmts) == stailq_first(&txn->stmts); +} + /** The current statement of the transaction. */ static inline struct txn_stmt * txn_current_stmt(struct txn *txn) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 6e24071d3956840858a0af4ab1b4ed1aa1d92d48..96efaa00a04de6d663b5cfeb7ee7c4b3f4cc2a67 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1254,7 +1254,52 @@ vy_is_committed(struct vy_env *env, struct space *space) } /** - * Get a vinyl tuple from the LSM tree by the key. + * Get a full tuple by a tuple read from a secondary index. + * @param lsm LSM tree from which the tuple was read. + * @param tx Current transaction. + * @param rv Read view. + * @param tuple Tuple read from a secondary index. + * @param[out] result The found tuple is stored here. Must be + * unreferenced after usage. + * + * @param 0 Success. + * @param -1 Memory error or read error. + */ +static int +vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx, + const struct vy_read_view **rv, + struct tuple *tuple, struct tuple **result) +{ + assert(lsm->index_id > 0); + /* + * No need in vy_tx_track() as the tuple must already be + * tracked in the secondary index LSM tree. + */ + if (vy_point_lookup(lsm->pk, tx, rv, tuple, result) != 0) + return -1; + + if (*result == NULL) { + /* + * All indexes of a space must be consistent, i.e. + * if a tuple is present in one index, it must be + * present in all other indexes as well, so we can + * get here only if there's a bug somewhere in vinyl. + * Don't abort as core dump won't really help us in + * this case. Just warn the user and proceed to the + * next tuple. + */ + say_warn("%s: key %s missing in primary index", + vy_lsm_name(lsm), vy_stmt_str(tuple)); + } + + if ((*rv)->vlsn == INT64_MAX) + vy_cache_add(&lsm->pk->cache, *result, NULL, tuple, ITER_EQ); + + return 0; +} + +/** + * Get a tuple from a vinyl space by key. * @param lsm LSM tree in which search. * @param tx Current transaction. * @param rv Read view. @@ -1265,10 +1310,10 @@ vy_is_committed(struct vy_env *env, struct space *space) * @param 0 Success. * @param -1 Memory error or read error. */ -static inline int -vy_lsm_get(struct vy_lsm *lsm, struct vy_tx *tx, - const struct vy_read_view **rv, - struct tuple *key, struct tuple **result) +static int +vy_get(struct vy_lsm *lsm, struct vy_tx *tx, + const struct vy_read_view **rv, + struct tuple *key, struct tuple **result) { /* * tx can be NULL, for example, if an user calls @@ -1276,52 +1321,112 @@ vy_lsm_get(struct vy_lsm *lsm, struct vy_tx *tx, */ assert(tx == NULL || tx->state == VINYL_TX_READY); + int rc; + struct tuple *tuple; + if (tuple_field_count(key) >= lsm->cmp_def->part_count) { + /* + * Use point lookup for a full key. + */ if (tx != NULL && vy_tx_track_point(tx, lsm, key) != 0) return -1; - return vy_point_lookup(lsm, tx, rv, key, result); + if (vy_point_lookup(lsm, tx, rv, key, &tuple) != 0) + return -1; + if (lsm->index_id > 0 && tuple != NULL) { + rc = vy_get_by_secondary_tuple(lsm, tx, rv, + tuple, result); + tuple_unref(tuple); + if (rc != 0) + return -1; + } else { + *result = tuple; + } + if ((*rv)->vlsn == INT64_MAX) + vy_cache_add(&lsm->cache, *result, NULL, key, ITER_EQ); + return 0; } struct vy_read_iterator itr; vy_read_iterator_open(&itr, lsm, tx, ITER_EQ, key, rv); - int rc = vy_read_iterator_next(&itr, result); - if (*result != NULL) - tuple_ref(*result); + while ((rc = vy_read_iterator_next(&itr, &tuple)) == 0) { + if (lsm->index_id == 0 || tuple == NULL) { + *result = tuple; + if (tuple != NULL) + tuple_ref(tuple); + break; + } + rc = vy_get_by_secondary_tuple(lsm, tx, rv, tuple, result); + if (rc != 0 || *result != NULL) + break; + } + if (rc == 0) + vy_read_iterator_cache_add(&itr, *result); vy_read_iterator_close(&itr); return rc; } /** - * Check if the LSM tree contains the key. If true, then set - * a duplicate key error in the diagnostics area. + * Get a tuple from a vinyl space by raw key. + * @param lsm LSM tree in which search. + * @param tx Current transaction. + * @param rv Read view. + * @param key_raw MsgPack array of key fields. + * @param part_count Count of parts in the key. + * @param[out] result The found tuple is stored here. Must be + * unreferenced after usage. + * + * @param 0 Success. + * @param -1 Memory error or read error. + */ +static int +vy_get_by_raw_key(struct vy_lsm *lsm, struct vy_tx *tx, + const struct vy_read_view **rv, + const char *key_raw, uint32_t part_count, + struct tuple **result) +{ + struct tuple *key = vy_stmt_new_select(lsm->env->key_format, + key_raw, part_count); + if (key == NULL) + return -1; + int rc = vy_get(lsm, tx, rv, key, result); + tuple_unref(key); + return rc; +} + +/** + * Check if insertion of a new tuple violates unique constraint + * of the primary index. * @param env Vinyl environment. * @param tx Current transaction. * @param rv Read view. * @param space_name Space name. * @param index_name Index name. - * @param lsm LSM tree in which to search. - * @param key Key statement. + * @param lsm LSM tree corresponding to the index. + * @param stmt New tuple. * - * @retval 0 Success, the key isn't found. - * @retval -1 Memory error or the key is found. + * @retval 0 Success, unique constraint is satisfied. + * @retval -1 Duplicate is found or read error occurred. */ static inline int -vy_check_is_unique(struct vy_env *env, struct vy_tx *tx, - const struct vy_read_view **rv, - const char *space_name, const char *index_name, - struct vy_lsm *lsm, struct tuple *key) +vy_check_is_unique_primary(struct vy_env *env, struct vy_tx *tx, + const struct vy_read_view **rv, + const char *space_name, const char *index_name, + struct vy_lsm *lsm, struct tuple *stmt) { - struct tuple *found; + assert(lsm->index_id == 0); + assert(vy_stmt_type(stmt) == IPROTO_INSERT); /* * During recovery we apply rows that were successfully * applied before restart so no conflict is possible. */ if (env->status != VINYL_ONLINE) return 0; - if (vy_lsm_get(lsm, tx, rv, key, &found)) + if (!lsm->check_is_unique) + return 0; + struct tuple *found; + if (vy_get(lsm, tx, rv, stmt, &found)) return -1; - - if (found) { + if (found != NULL) { tuple_unref(found); diag_set(ClientError, ER_TUPLE_FOUND, index_name, space_name); @@ -1351,19 +1456,36 @@ vy_check_is_unique_secondary(struct vy_env *env, struct vy_tx *tx, struct vy_lsm *lsm, const struct tuple *stmt) { assert(lsm->index_id > 0); - struct key_def *def = lsm->key_def; - if (lsm->check_is_unique && - !key_update_can_be_skipped(def->column_mask, - vy_stmt_column_mask(stmt)) && - (!def->is_nullable || !vy_tuple_key_contains_null(stmt, def))) { - struct tuple *key = vy_stmt_extract_key(stmt, def, - lsm->env->key_format); - if (key == NULL) - return -1; - int rc = vy_check_is_unique(env, tx, rv, space_name, - index_name, lsm, key); - tuple_unref(key); - return rc; + assert(vy_stmt_type(stmt) == IPROTO_INSERT || + vy_stmt_type(stmt) == IPROTO_REPLACE); + /* + * During recovery we apply rows that were successfully + * applied before restart so no conflict is possible. + */ + if (env->status != VINYL_ONLINE) + return 0; + if (!lsm->check_is_unique) + return 0; + if (key_update_can_be_skipped(lsm->key_def->column_mask, + vy_stmt_column_mask(stmt))) + return 0; + if (lsm->key_def->is_nullable && + vy_tuple_key_contains_null(stmt, lsm->key_def)) + return 0; + struct tuple *key = vy_stmt_extract_key(stmt, lsm->key_def, + lsm->env->key_format); + if (key == NULL) + return -1; + struct tuple *found; + int rc = vy_get(lsm, tx, rv, key, &found); + tuple_unref(key); + if (rc != 0) + return -1; + if (found != NULL) { + tuple_unref(found); + diag_set(ClientError, ER_TUPLE_FOUND, + index_name, space_name); + return -1; } return 0; } @@ -1390,10 +1512,10 @@ vy_insert_primary(struct vy_env *env, struct vy_tx *tx, struct space *space, * A primary index is always unique and the new tuple must not * conflict with existing tuples. */ - if (pk->check_is_unique && - vy_check_is_unique(env, tx, vy_tx_read_view(tx), space_name(space), - index_name_by_id(space, pk->index_id), - pk, stmt) != 0) + if (vy_check_is_unique_primary(env, tx, vy_tx_read_view(tx), + space_name(space), + index_name_by_id(space, pk->index_id), + pk, stmt) != 0) return -1; return vy_tx_set(tx, pk, stmt); } @@ -1433,161 +1555,6 @@ vy_insert_secondary(struct vy_env *env, struct vy_tx *tx, struct space *space, return vy_tx_set(tx, lsm, stmt); } -/** - * Execute REPLACE in a space with a single index, possibly with - * lookup for an old tuple if the space has at least one - * on_replace trigger. - * @param env Vinyl environment. - * @param tx Current transaction. - * @param space Space in which replace. - * @param request Request with the tuple data. - * @param stmt Statement for triggers is filled with old - * statement. - * - * @retval 0 Success. - * @retval -1 Memory error OR duplicate key error OR the primary - * index is not found OR a tuple reference increment - * error. - */ -static inline int -vy_replace_one(struct vy_env *env, struct vy_tx *tx, struct space *space, - struct request *request, struct txn_stmt *stmt) -{ - (void)env; - assert(tx != NULL && tx->state == VINYL_TX_READY); - struct vy_lsm *pk = vy_lsm(space->index[0]); - assert(pk->index_id == 0); - if (tuple_validate_raw(pk->mem_format, request->tuple)) - return -1; - struct tuple *new_tuple = - vy_stmt_new_replace(pk->mem_format, request->tuple, - request->tuple_end); - if (new_tuple == NULL) - return -1; - /** - * If the space has triggers, then we need to fetch the - * old tuple to pass it to the trigger. - */ - if (stmt != NULL && !rlist_empty(&space->on_replace)) { - if (vy_lsm_get(pk, tx, vy_tx_read_view(tx), - new_tuple, &stmt->old_tuple) != 0) - goto error_unref; - } - if (vy_tx_set(tx, pk, new_tuple)) - goto error_unref; - - if (stmt != NULL) - stmt->new_tuple = new_tuple; - else - tuple_unref(new_tuple); - return 0; - -error_unref: - tuple_unref(new_tuple); - return -1; -} - -/** - * Execute REPLACE in a space with multiple indexes and lookup for - * an old tuple, that should has been set in \p stmt->old_tuple if - * the space has at least one on_replace trigger. - * @param env Vinyl environment. - * @param tx Current transaction. - * @param space Vinyl space. - * @param request Request with the tuple data. - * @param stmt Statement for triggers filled with old - * statement. - * - * @retval 0 Success - * @retval -1 Memory error OR duplicate key error OR the primary - * index is not found OR a tuple reference increment - * error. - */ -static inline int -vy_replace_impl(struct vy_env *env, struct vy_tx *tx, struct space *space, - struct request *request, struct txn_stmt *stmt) -{ - assert(tx != NULL && tx->state == VINYL_TX_READY); - struct tuple *old_stmt = NULL; - struct tuple *new_stmt = NULL; - struct tuple *delete = NULL; - struct vy_lsm *pk = vy_lsm_find(space, 0); - if (pk == NULL) /* space has no primary key */ - return -1; - /* Primary key is dumped last. */ - assert(!vy_is_committed_one(env, pk)); - assert(pk->index_id == 0); - if (tuple_validate_raw(pk->mem_format, request->tuple)) - return -1; - new_stmt = vy_stmt_new_replace(pk->mem_format, request->tuple, - request->tuple_end); - if (new_stmt == NULL) - return -1; - - /* Get full tuple from the primary index. */ - if (vy_lsm_get(pk, tx, vy_tx_read_view(tx), - new_stmt, &old_stmt) != 0) - goto error; - - if (old_stmt == NULL) { - /* - * We can turn REPLACE into INSERT if the new key - * does not have history. - */ - vy_stmt_set_type(new_stmt, IPROTO_INSERT); - } - - /* - * Replace in the primary index without explicit deletion - * of the old tuple. - */ - if (vy_tx_set(tx, pk, new_stmt) != 0) - goto error; - - if (space->index_count > 1 && old_stmt != NULL) { - delete = vy_stmt_new_surrogate_delete(pk->mem_format, old_stmt); - if (delete == NULL) - goto error; - } - - /* Update secondary keys, avoid duplicates. */ - for (uint32_t iid = 1; iid < space->index_count; ++iid) { - struct vy_lsm *lsm = vy_lsm(space->index[iid]); - if (vy_is_committed_one(env, lsm)) - continue; - /* - * Delete goes first, so if old and new keys - * fully match, there is no look up beyond the - * transaction index. - */ - if (old_stmt != NULL) { - if (vy_tx_set(tx, lsm, delete) != 0) - goto error; - } - if (vy_insert_secondary(env, tx, space, lsm, new_stmt) != 0) - goto error; - } - if (delete != NULL) - tuple_unref(delete); - /* - * The old tuple is used if there is an on_replace - * trigger. - */ - if (stmt != NULL) { - stmt->new_tuple = new_stmt; - stmt->old_tuple = old_stmt; - } - return 0; -error: - if (delete != NULL) - tuple_unref(delete); - if (old_stmt != NULL) - tuple_unref(old_stmt); - if (new_stmt != NULL) - tuple_unref(new_stmt); - return -1; -} - /** * Check that the key can be used for search in a unique index * LSM tree. @@ -1624,92 +1591,6 @@ vy_unique_key_validate(struct vy_lsm *lsm, const char *key, return key_validate_parts(lsm->cmp_def, key, part_count, false); } -/** - * Find a tuple in the primary index LSM tree by the key of the - * specified LSM tree. - * @param lsm LSM tree for which the key is specified. - * Can be both primary and secondary. - * @param tx Current transaction. - * @param rv Read view. - * @param key_raw MessagePack'ed data, the array without a - * header. - * @param part_count Count of parts in the key. - * @param[out] result The found statement is stored here. Must be - * unreferenced after usage. - * - * @retval 0 Success. - * @retval -1 Memory error. - */ -static inline int -vy_lsm_full_by_key(struct vy_lsm *lsm, struct vy_tx *tx, - const struct vy_read_view **rv, - const char *key_raw, uint32_t part_count, - struct tuple **result) -{ - int rc; - struct tuple *key = vy_stmt_new_select(lsm->env->key_format, - key_raw, part_count); - if (key == NULL) - return -1; - struct tuple *found; - rc = vy_lsm_get(lsm, tx, rv, key, &found); - tuple_unref(key); - if (rc != 0) - return -1; - if (lsm->index_id == 0 || found == NULL) { - *result = found; - return 0; - } - /* - * No need in vy_tx_track() as the tuple is already - * tracked in the secondary index LSM tree. - */ - rc = vy_point_lookup(lsm->pk, tx, rv, found, result); - tuple_unref(found); - return rc; -} - -/** - * Delete the tuple from all LSM trees of the vinyl space. - * @param env Vinyl environment. - * @param tx Current transaction. - * @param space Vinyl space. - * @param tuple Tuple to delete. - * - * @retval 0 Success - * @retval -1 Memory error or the index is not found. - */ -static inline int -vy_delete_impl(struct vy_env *env, struct vy_tx *tx, struct space *space, - const struct tuple *tuple) -{ - struct vy_lsm *pk = vy_lsm_find(space, 0); - if (pk == NULL) - return -1; - /* Primary key is dumped last. */ - assert(!vy_is_committed_one(env, pk)); - struct tuple *delete = - vy_stmt_new_surrogate_delete(pk->mem_format, tuple); - if (delete == NULL) - return -1; - if (vy_tx_set(tx, pk, delete) != 0) - goto error; - - /* At second, delete from seconary indexes. */ - for (uint32_t i = 1; i < space->index_count; ++i) { - struct vy_lsm *lsm = vy_lsm(space->index[i]); - if (vy_is_committed_one(env, lsm)) - continue; - if (vy_tx_set(tx, lsm, delete) != 0) - goto error; - } - tuple_unref(delete); - return 0; -error: - tuple_unref(delete); - return -1; -} - /** * Execute DELETE in a vinyl space. * @param env Vinyl environment. @@ -1751,27 +1632,38 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, * and pass them to indexes for deletion. */ if (has_secondary || !rlist_empty(&space->on_replace)) { - if (vy_lsm_full_by_key(lsm, tx, vy_tx_read_view(tx), - key, part_count, &stmt->old_tuple) != 0) + if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx), + key, part_count, &stmt->old_tuple) != 0) return -1; if (stmt->old_tuple == NULL) return 0; } + int rc = 0; + struct tuple *delete; if (has_secondary) { assert(stmt->old_tuple != NULL); - return vy_delete_impl(env, tx, space, stmt->old_tuple); + delete = vy_stmt_new_surrogate_delete(pk->mem_format, + stmt->old_tuple); + if (delete == NULL) + return -1; + for (uint32_t i = 0; i < space->index_count; i++) { + struct vy_lsm *lsm = vy_lsm(space->index[i]); + if (vy_is_committed_one(env, lsm)) + continue; + rc = vy_tx_set(tx, lsm, delete); + if (rc != 0) + break; + } } else { /* Primary is the single index in the space. */ assert(lsm->index_id == 0); - struct tuple *delete = - vy_stmt_new_surrogate_delete_from_key(request->key, - pk->key_def, - pk->mem_format); + delete = vy_stmt_new_surrogate_delete_from_key(request->key, + pk->key_def, pk->mem_format); if (delete == NULL) return -1; - int rc = vy_tx_set(tx, pk, delete); - tuple_unref(delete); - return rc; + rc = vy_tx_set(tx, pk, delete); } + tuple_unref(delete); + return rc; } /** @@ -1833,8 +1725,8 @@ vy_update(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, if (vy_unique_key_validate(lsm, key, part_count)) return -1; - if (vy_lsm_full_by_key(lsm, tx, vy_tx_read_view(tx), - key, part_count, &stmt->old_tuple) != 0) + if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx), + key, part_count, &stmt->old_tuple) != 0) return -1; /* Nothing to update. */ if (stmt->old_tuple == NULL) @@ -2107,8 +1999,7 @@ vy_upsert(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, pk->key_def, pk->env->key_format); if (key == NULL) return -1; - int rc = vy_lsm_get(pk, tx, vy_tx_read_view(tx), - key, &stmt->old_tuple); + int rc = vy_get(pk, tx, vy_tx_read_view(tx), key, &stmt->old_tuple); tuple_unref(key); if (rc != 0) return -1; @@ -2257,18 +2148,86 @@ static int vy_replace(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, struct space *space, struct request *request) { + assert(tx != NULL && tx->state == VINYL_TX_READY); if (vy_is_committed(env, space)) return 0; if (request->type == IPROTO_INSERT) return vy_insert(env, tx, stmt, space, request); - if (space->index_count == 1) { - /* Replace in a space with a single index. */ - return vy_replace_one(env, tx, space, request, stmt); - } else { - /* Replace in a space with secondary indexes. */ - return vy_replace_impl(env, tx, space, request, stmt); + struct vy_lsm *pk = vy_lsm_find(space, 0); + if (pk == NULL) + return -1; + /* Primary key is dumped last. */ + assert(!vy_is_committed_one(env, pk)); + + /* Validate and create a statement for the new tuple. */ + if (tuple_validate_raw(pk->mem_format, request->tuple)) + return -1; + stmt->new_tuple = vy_stmt_new_replace(pk->mem_format, request->tuple, + request->tuple_end); + if (stmt->new_tuple == NULL) + return -1; + /* + * Get the overwritten tuple from the primary index if + * the space has on_replace triggers, in which case we + * need to pass the old tuple to trigger callbacks, or + * if the space has secondary indexes and so we need + * the old tuple to delete it from them. + */ + if (space->index_count > 1 || !rlist_empty(&space->on_replace)) { + if (vy_get(pk, tx, vy_tx_read_view(tx), + stmt->new_tuple, &stmt->old_tuple) != 0) + return -1; + if (stmt->old_tuple == NULL) { + /* + * We can turn REPLACE into INSERT if the + * new key does not have history. + */ + vy_stmt_set_type(stmt->new_tuple, IPROTO_INSERT); + } + } + /* + * Replace in the primary index without explicit deletion + * of the old tuple. + */ + if (vy_tx_set(tx, pk, stmt->new_tuple) != 0) + return -1; + if (space->index_count == 1) + return 0; + /* + * Replace in secondary indexes with explicit deletion + * of the old tuple, if any. + */ + int rc = 0; + struct tuple *delete = NULL; + if (stmt->old_tuple != NULL) { + delete = vy_stmt_new_surrogate_delete(pk->mem_format, + stmt->old_tuple); + if (delete == NULL) + return -1; + } + for (uint32_t i = 1; i < space->index_count; i++) { + struct vy_lsm *lsm = vy_lsm(space->index[i]); + if (vy_is_committed_one(env, lsm)) + continue; + /* + * DELETE goes first, so if old and new keys + * fully match, there is no look up beyond the + * transaction write set. + */ + if (delete != NULL) { + rc = vy_tx_set(tx, lsm, delete); + if (rc != 0) + break; + } + rc = vy_insert_secondary(env, tx, space, lsm, + stmt->new_tuple); + if (rc != 0) + break; } + if (delete != NULL) + tuple_unref(delete); + return rc; } static int @@ -3533,11 +3492,6 @@ vy_squash_process(struct vy_squash *squash) struct vy_lsm *lsm = squash->lsm; struct vy_env *env = squash->env; - /* - * vy_apply_upsert() is used for primary key only, - * so this is the same as lsm->key_def - */ - struct key_def *def = lsm->cmp_def; /* Upserts enabled only in the primary index LSM tree. */ assert(lsm->index_id == 0); @@ -3555,8 +3509,10 @@ vy_squash_process(struct vy_squash *squash) /* * While we were reading on-disk runs, new statements could - * have been inserted into the in-memory tree. Apply them to - * the result. + * have been prepared for the squashed key. We mustn't apply + * them, because they may be rolled back, but we must adjust + * their n_upserts counter so that they will get squashed by + * vy_lsm_commit_upsert(). */ struct vy_mem *mem = lsm->mem; struct tree_mem_key tree_key = { @@ -3573,108 +3529,20 @@ vy_squash_process(struct vy_squash *squash) tuple_unref(result); return 0; } - /** - * Algorithm of the squashing. - * Assume, during building the non-UPSERT statement - * 'result' in the mem some new UPSERTs were inserted, and - * some of them were commited, while the other were just - * prepared. And lets UPSERT_THRESHOLD to be equal to 3, - * for example. - * Mem - * -------------------------------------+ - * UPSERT, lsn = 1, n_ups = 0 | - * UPSERT, lsn = 2, n_ups = 1 | Commited - * UPSERT, lsn = 3, n_ups = 2 | - * -------------------------------------+ - * UPSERT, lsn = MAX, n_ups = 3 | - * UPSERT, lsn = MAX + 1, n_ups = 4 | Prepared - * UPSERT, lsn = MAX + 2, n_ups = 5 | - * -------------------------------------+ - * In such a case the UPSERT statements with - * lsns = {1, 2, 3} are squashed. But now the n_upsert - * values in the prepared statements are not correct. - * If we will not update values, then the - * vy_lsm_commit_upsert will not be able to squash them. - * - * So after squashing it is necessary to update n_upsert - * value in the prepared statements: - * Mem - * -------------------------------------+ - * UPSERT, lsn = 1, n_ups = 0 | - * UPSERT, lsn = 2, n_ups = 1 | Commited - * REPLACE, lsn = 3 | - * -------------------------------------+ - * UPSERT, lsn = MAX, n_ups = 0 !!! | - * UPSERT, lsn = MAX + 1, n_ups = 1 !!! | Prepared - * UPSERT, lsn = MAX + 2, n_ups = 2 !!! | - * -------------------------------------+ - */ vy_mem_tree_iterator_prev(&mem->tree, &mem_itr); - const struct tuple *mem_stmt; - int64_t stmt_lsn; - /* - * According to the described algorithm, squash the - * commited UPSERTs at first. - */ + uint8_t n_upserts = 0; while (!vy_mem_tree_iterator_is_invalid(&mem_itr)) { + const struct tuple *mem_stmt; mem_stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr); - stmt_lsn = vy_stmt_lsn(mem_stmt); - if (vy_tuple_compare(result, mem_stmt, def) != 0) - break; - /** - * Leave alone prepared statements; they will be handled - * in vy_range_commit_stmt. - */ - if (stmt_lsn >= MAX_LSN) + if (vy_tuple_compare(result, mem_stmt, lsm->cmp_def) != 0 || + vy_stmt_type(mem_stmt) != IPROTO_UPSERT) break; - if (vy_stmt_type(mem_stmt) != IPROTO_UPSERT) { - /** - * Somebody inserted non-upsert statement, - * squashing is useless. - */ - tuple_unref(result); - return 0; - } - assert(lsm->index_id == 0); - struct tuple *applied = vy_apply_upsert(mem_stmt, result, def, - mem->format, true); - lsm->stat.upsert.applied++; - tuple_unref(result); - if (applied == NULL) - return -1; - result = applied; - /** - * In normal cases we get a result with the same lsn as - * in mem_stmt. - * But if there are buggy upserts that do wrong things, - * they are ignored and the result has lower lsn. - * We should fix the lsn in any case to replace - * exactly mem_stmt in general and the buggy upsert - * in particular. - */ - vy_stmt_set_lsn(result, stmt_lsn); + assert(vy_stmt_lsn(mem_stmt) >= MAX_LSN); + vy_stmt_set_n_upserts((struct tuple *)mem_stmt, n_upserts); + if (n_upserts <= VY_UPSERT_THRESHOLD) + ++n_upserts; vy_mem_tree_iterator_prev(&mem->tree, &mem_itr); } - /* - * The second step of the algorithm above is updating of - * n_upsert values of the prepared UPSERTs. - */ - if (stmt_lsn >= MAX_LSN) { - uint8_t n_upserts = 0; - while (!vy_mem_tree_iterator_is_invalid(&mem_itr)) { - mem_stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, - &mem_itr); - if (vy_tuple_compare(result, mem_stmt, def) != 0 || - vy_stmt_type(mem_stmt) != IPROTO_UPSERT) - break; - assert(vy_stmt_lsn(mem_stmt) >= MAX_LSN); - vy_stmt_set_n_upserts((struct tuple *)mem_stmt, - n_upserts); - if (n_upserts <= VY_UPSERT_THRESHOLD) - ++n_upserts; - vy_mem_tree_iterator_prev(&mem->tree, &mem_itr); - } - } lsm->stat.upsert.squashed++; @@ -3860,6 +3728,7 @@ vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret) goto fail; if (vy_read_iterator_next(&it->iterator, ret) != 0) goto fail; + vy_read_iterator_cache_add(&it->iterator, *ret); if (*ret == NULL) { /* EOF. Close the iterator immediately. */ vinyl_iterator_close(it); @@ -3889,6 +3758,7 @@ vinyl_iterator_secondary_next(struct iterator *base, struct tuple **ret) if (tuple == NULL) { /* EOF. Close the iterator immediately. */ + vy_read_iterator_cache_add(&it->iterator, NULL); vinyl_iterator_close(it); *ret = NULL; return 0; @@ -3901,28 +3771,14 @@ vinyl_iterator_secondary_next(struct iterator *base, struct tuple **ret) fiber_sleep(0.01); } #endif - /* - * Get the full tuple from the primary index. - * Note, there's no need in vy_tx_track() as the - * tuple is already tracked in the secondary index. - */ - if (vy_point_lookup(it->lsm->pk, it->tx, vy_tx_read_view(it->tx), - tuple, ret) != 0) + /* Get the full tuple from the primary index. */ + if (vy_get_by_secondary_tuple(it->lsm, it->tx, + vy_tx_read_view(it->tx), + tuple, ret) != 0) goto fail; - if (*ret == NULL) { - /* - * All indexes of a space must be consistent, i.e. - * if a tuple is present in one index, it must be - * present in all other indexes as well, so we can - * get here only if there's a bug somewhere in vinyl. - * Don't abort as core dump won't really help us in - * this case. Just warn the user and proceed to the - * next tuple. - */ - say_warn("%s: key %s missing in primary index", - vy_lsm_name(it->lsm), vy_stmt_str(tuple)); + if (*ret == NULL) goto next; - } + vy_read_iterator_cache_add(&it->iterator, *ret); tuple_bless(*ret); tuple_unref(*ret); return 0; @@ -4011,7 +3867,7 @@ vinyl_index_get(struct index *index, const char *key, const struct vy_read_view **rv = (tx != NULL ? vy_tx_read_view(tx) : &env->xm->p_global_read_view); - if (vy_lsm_full_by_key(lsm, tx, rv, key, part_count, ret) != 0) + if (vy_get_by_raw_key(lsm, tx, rv, key, part_count, ret) != 0) return -1; if (*ret != NULL) { tuple_bless(*ret); diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c index 504a8e80d20f1255469f574202e990bcb510ac1f..5e43340b90fa76d154e610bde9545c29764a9bec 100644 --- a/src/box/vy_point_lookup.c +++ b/src/box/vy_point_lookup.c @@ -203,10 +203,13 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx, int rc = 0; lsm->stat.lookup++; + /* History list */ - struct vy_history history; + struct vy_history history, mem_history, disk_history; vy_history_create(&history, &lsm->env->history_node_pool); -restart: + vy_history_create(&mem_history, &lsm->env->history_node_pool); + vy_history_create(&disk_history, &lsm->env->history_node_pool); + rc = vy_point_lookup_scan_txw(lsm, tx, key, &history); if (rc != 0 || vy_history_is_terminal(&history)) goto done; @@ -215,14 +218,16 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx, if (rc != 0 || vy_history_is_terminal(&history)) goto done; - rc = vy_point_lookup_scan_mems(lsm, rv, key, &history); - if (rc != 0 || vy_history_is_terminal(&history)) +restart: + rc = vy_point_lookup_scan_mems(lsm, rv, key, &mem_history); + if (rc != 0 || vy_history_is_terminal(&mem_history)) goto done; /* Save version before yield */ + uint32_t mem_version = lsm->mem->version; uint32_t mem_list_version = lsm->mem_list_version; - rc = vy_point_lookup_scan_slices(lsm, rv, key, &history); + rc = vy_point_lookup_scan_slices(lsm, rv, key, &disk_history); if (rc != 0) goto done; @@ -241,11 +246,29 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx, * This in unnecessary in case of rotation but since we * cannot distinguish these two cases we always restart. */ - vy_history_cleanup(&history); + vy_history_cleanup(&mem_history); + vy_history_cleanup(&disk_history); goto restart; } + if (mem_version != lsm->mem->version) { + /* + * Rescan the memory level if its version changed while we + * were reading disk, because there may be new statements + * matching the search key. + */ + vy_history_cleanup(&mem_history); + rc = vy_point_lookup_scan_mems(lsm, rv, key, &mem_history); + if (rc != 0) + goto done; + if (vy_history_is_terminal(&mem_history)) + vy_history_cleanup(&disk_history); + } + done: + vy_history_splice(&history, &mem_history); + vy_history_splice(&history, &disk_history); + if (rc == 0) { int upserts_applied; rc = vy_history_apply(&history, lsm->cmp_def, lsm->mem_format, @@ -257,11 +280,8 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx, if (rc != 0) return -1; - if (*ret != NULL) { + if (*ret != NULL) vy_stmt_counter_acct_tuple(&lsm->stat.get, *ret); - if ((*rv)->vlsn == INT64_MAX) - vy_cache_add(&lsm->cache, *ret, NULL, key, ITER_EQ); - } double latency = ev_monotonic_now(loop()) - start_time; latency_collect(&lsm->stat.latency, latency); diff --git a/src/box/vy_point_lookup.h b/src/box/vy_point_lookup.h index d74be9a9dabb5c255e0e0349e50ee145d68c0605..3b7c5a04cd8ce5fa2838bec238205ccefa274bf6 100644 --- a/src/box/vy_point_lookup.h +++ b/src/box/vy_point_lookup.h @@ -62,12 +62,9 @@ struct tuple; * tuple in the LSM tree. The tuple is returned in @ret with its * reference counter elevated. * - * The caller must guarantee that if the tuple looked up by this - * function is modified, the transaction will be sent to read view. - * This is needed to avoid inserting a stale value into the cache. - * In other words, vy_tx_track() must be called for the search key - * before calling this function unless this is a primary index and - * the tuple is already tracked in a secondary index. + * Note, this function doesn't track the result in the transaction + * read set, i.e. it is up to the caller to call vy_tx_track() if + * necessary. */ int vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx, diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c index 160bb899440283a5ad193d73b4a672d3056fa031..954fc0df6a8ca84cc04de44eba1439caeba5fad4 100644 --- a/src/box/vy_read_iterator.c +++ b/src/box/vy_read_iterator.c @@ -845,24 +845,17 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result) ev_tstamp start_time = ev_monotonic_now(loop()); struct vy_lsm *lsm = itr->lsm; - struct tuple *stmt, *prev_stmt; + struct tuple *stmt; - /* - * Remember the statement returned by the last iteration. - * We will need it to update the cache. - */ - prev_stmt = itr->last_stmt; - if (prev_stmt != NULL) - tuple_ref(prev_stmt); - else /* first iteration */ - lsm->stat.lookup++; + if (itr->last_stmt == NULL) + lsm->stat.lookup++; /* first iteration */ next_key: if (vy_read_iterator_advance(itr) != 0) - goto err; + return -1; if (vy_read_iterator_apply_history(itr, &stmt) != 0) - goto err; + return -1; if (vy_read_iterator_track_read(itr, stmt) != 0) - goto err; + return -1; if (itr->last_stmt != NULL) tuple_unref(itr->last_stmt); @@ -877,9 +870,9 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result) * previous + current tuple as an unbroken chain. */ if (vy_stmt_lsn(stmt) == INT64_MAX) { - if (prev_stmt != NULL) - tuple_unref(prev_stmt); - prev_stmt = NULL; + if (itr->last_cached_stmt != NULL) + tuple_unref(itr->last_cached_stmt); + itr->last_cached_stmt = NULL; } goto next_key; } @@ -887,18 +880,6 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result) vy_stmt_type(stmt) == IPROTO_INSERT || vy_stmt_type(stmt) == IPROTO_REPLACE); - /* - * Store the result in the cache provided we are reading - * the latest data. - */ - if ((**itr->read_view).vlsn == INT64_MAX) { - vy_cache_add(&lsm->cache, stmt, prev_stmt, - itr->key, itr->iterator_type); - } - if (prev_stmt != NULL) - tuple_unref(prev_stmt); - - /* Update LSM tree stats. */ if (stmt != NULL) vy_stmt_counter_acct_tuple(&lsm->stat.get, stmt); @@ -914,10 +895,24 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result) *result = stmt; return 0; -err: - if (prev_stmt != NULL) - tuple_unref(prev_stmt); - return -1; +} + +void +vy_read_iterator_cache_add(struct vy_read_iterator *itr, struct tuple *stmt) +{ + if ((**itr->read_view).vlsn != INT64_MAX) { + if (itr->last_cached_stmt != NULL) + tuple_unref(itr->last_cached_stmt); + itr->last_cached_stmt = NULL; + return; + } + vy_cache_add(&itr->lsm->cache, stmt, itr->last_cached_stmt, + itr->key, itr->iterator_type); + if (stmt != NULL) + tuple_ref(stmt); + if (itr->last_cached_stmt != NULL) + tuple_unref(itr->last_cached_stmt); + itr->last_cached_stmt = stmt; } /** @@ -928,6 +923,8 @@ vy_read_iterator_close(struct vy_read_iterator *itr) { if (itr->last_stmt != NULL) tuple_unref(itr->last_stmt); + if (itr->last_cached_stmt != NULL) + tuple_unref(itr->last_cached_stmt); vy_read_iterator_cleanup(itr); free(itr->src); TRASH(itr); diff --git a/src/box/vy_read_iterator.h b/src/box/vy_read_iterator.h index 2cac1087c09089e40ce0431ea1a1f481d086b922..baab8859231af8a72ff832652365beac3a199e0b 100644 --- a/src/box/vy_read_iterator.h +++ b/src/box/vy_read_iterator.h @@ -64,6 +64,11 @@ struct vy_read_iterator { bool need_check_eq; /** Last statement returned by vy_read_iterator_next(). */ struct tuple *last_stmt; + /** + * Last statement added to the tuple cache by + * vy_read_iterator_cache_add(). + */ + struct tuple *last_cached_stmt; /** * Copy of lsm->range_tree_version. * Used for detecting range tree changes. @@ -141,6 +146,25 @@ vy_read_iterator_open(struct vy_read_iterator *itr, struct vy_lsm *lsm, NODISCARD int vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result); +/** + * Add the last tuple returned by the read iterator to the cache. + * @param itr Read iterator + * @param stmt Last tuple returned by the iterator. + * + * We use a separate function for populating the cache rather than + * doing that right in vy_read_iterator_next() so that we can store + * full tuples in a secondary index cache, thus saving some memory. + * + * Usage pattern: + * - Call vy_read_iterator_next() to get a partial tuple. + * - Call vy_point_lookup() to get the full tuple corresponding + * to the partial tuple returned by the iterator. + * - Call vy_read_iterator_cache_add() on the full tuple to add + * the result to the cache. + */ +void +vy_read_iterator_cache_add(struct vy_read_iterator *itr, struct tuple *stmt); + /** * Close the iterator and free resources. */ diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index a82fe9f2c17e5188d3ee2af008cd8ddd2f8a94ed..b206a6057e4f1d233679172bd092e01795e88df9 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -46,6 +46,7 @@ #include "errinj.h" #include "fiber.h" #include "fiber_cond.h" +#include "cbus.h" #include "salad/stailq.h" #include "say.h" #include "vy_lsm.h" @@ -55,14 +56,34 @@ #include "vy_run.h" #include "vy_write_iterator.h" #include "trivia/util.h" -#include "tt_pthread.h" /* Min and max values for vy_scheduler::timeout. */ #define VY_SCHEDULER_TIMEOUT_MIN 1 #define VY_SCHEDULER_TIMEOUT_MAX 60 -static void *vy_worker_f(void *); +static int vy_worker_f(va_list); static int vy_scheduler_f(va_list); +static void vy_task_execute_f(struct cmsg *); +static void vy_task_complete_f(struct cmsg *); + +static const struct cmsg_hop vy_task_execute_route[] = { + { vy_task_execute_f, NULL }, +}; + +static const struct cmsg_hop vy_task_complete_route[] = { + { vy_task_complete_f, NULL }, +}; + +/** Vinyl worker thread. */ +struct vy_worker { + struct cord cord; + /** Pipe from tx to the worker thread. */ + struct cpipe worker_pipe; + /** Pipe from the worker thread to tx. */ + struct cpipe tx_pipe; + /** Link in vy_scheduler::idle_workers. */ + struct stailq_entry in_idle; +}; struct vy_task; @@ -72,31 +93,42 @@ struct vy_task_ops { * which is too heavy for the tx thread (like IO or compression). * Returns 0 on success. On failure returns -1 and sets diag. */ - int (*execute)(struct vy_scheduler *scheduler, struct vy_task *task); + int (*execute)(struct vy_task *task); /** * This function is called by the scheduler upon task completion. * It may be used to finish the task from the tx thread context. * * Returns 0 on success. On failure returns -1 and sets diag. */ - int (*complete)(struct vy_scheduler *scheduler, struct vy_task *task); + int (*complete)(struct vy_task *task); /** * This function is called by the scheduler if either ->execute * or ->complete failed. It may be used to undo changes done to * the LSM tree when preparing the task. - * - * If @in_shutdown is set, the callback is invoked from the - * engine destructor. */ - void (*abort)(struct vy_scheduler *scheduler, struct vy_task *task, - bool in_shutdown); + void (*abort)(struct vy_task *task); }; struct vy_task { + /** + * CBus message used for sending the task to/from + * a worker thread. + */ + struct cmsg cmsg; + /** Virtual method table. */ const struct vy_task_ops *ops; - /** Return code of ->execute. */ - int status; - /** If ->execute fails, the error is stored here. */ + /** Pointer to the scheduler. */ + struct vy_scheduler *scheduler; + /** Worker thread this task is assigned to. */ + struct vy_worker *worker; + /** + * Fiber that is currently executing this task in + * a worker thread. + */ + struct fiber *fiber; + /** Set if the task failed. */ + bool is_failed; + /** In case of task failure the error is stored here. */ struct diag diag; /** LSM tree this task is for. */ struct vy_lsm *lsm; @@ -120,11 +152,6 @@ struct vy_task { * need to remember the slices we are compacting. */ struct vy_slice *first_slice, *last_slice; - /** - * Link in the list of pending or processed tasks. - * See vy_scheduler::input_queue, output_queue. - */ - struct stailq_entry link; /** * Index options may be modified while a task is in * progress so we save them here to safely access them @@ -132,6 +159,8 @@ struct vy_task { */ double bloom_fpr; int64_t page_size; + /** Link in vy_scheduler::processed_tasks. */ + struct stailq_entry in_processed; }; /** @@ -142,10 +171,10 @@ struct vy_task { * does not free it from under us. */ static struct vy_task * -vy_task_new(struct mempool *pool, struct vy_lsm *lsm, +vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, const struct vy_task_ops *ops) { - struct vy_task *task = mempool_alloc(pool); + struct vy_task *task = mempool_alloc(&scheduler->task_pool); if (task == NULL) { diag_set(OutOfMemory, sizeof(*task), "mempool", "struct vy_task"); @@ -153,16 +182,17 @@ vy_task_new(struct mempool *pool, struct vy_lsm *lsm, } memset(task, 0, sizeof(*task)); task->ops = ops; + task->scheduler = scheduler; task->lsm = lsm; task->cmp_def = key_def_dup(lsm->cmp_def); if (task->cmp_def == NULL) { - mempool_free(pool, task); + mempool_free(&scheduler->task_pool, task); return NULL; } task->key_def = key_def_dup(lsm->key_def); if (task->key_def == NULL) { key_def_delete(task->cmp_def); - mempool_free(pool, task); + mempool_free(&scheduler->task_pool, task); return NULL; } vy_lsm_ref(lsm); @@ -172,14 +202,13 @@ vy_task_new(struct mempool *pool, struct vy_lsm *lsm, /** Free a task allocated with vy_task_new(). */ static void -vy_task_delete(struct mempool *pool, struct vy_task *task) +vy_task_delete(struct vy_task *task) { key_def_delete(task->cmp_def); key_def_delete(task->key_def); vy_lsm_unref(task->lsm); diag_destroy(&task->diag); - TRASH(task); - mempool_free(pool, task); + mempool_free(&task->scheduler->task_pool, task); } static bool @@ -242,71 +271,43 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b) #undef HEAP_LESS #undef HEAP_NAME -static void -vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events) -{ - (void)loop; - (void)events; - struct vy_scheduler *scheduler = container_of(watcher, - struct vy_scheduler, scheduler_async); - fiber_cond_signal(&scheduler->scheduler_cond); -} - static void vy_scheduler_start_workers(struct vy_scheduler *scheduler) { - assert(!scheduler->is_worker_pool_running); + assert(scheduler->worker_pool == NULL); /* One thread is reserved for dumps, see vy_schedule(). */ assert(scheduler->worker_pool_size >= 2); - scheduler->is_worker_pool_running = true; - scheduler->workers_available = scheduler->worker_pool_size; + scheduler->idle_worker_count = scheduler->worker_pool_size; scheduler->worker_pool = calloc(scheduler->worker_pool_size, - sizeof(struct cord)); + sizeof(*scheduler->worker_pool)); if (scheduler->worker_pool == NULL) panic("failed to allocate vinyl worker pool"); - ev_async_start(scheduler->scheduler_loop, &scheduler->scheduler_async); for (int i = 0; i < scheduler->worker_pool_size; i++) { char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "vinyl.writer.%d", i); - if (cord_start(&scheduler->worker_pool[i], name, - vy_worker_f, scheduler) != 0) + struct vy_worker *worker = &scheduler->worker_pool[i]; + if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0) panic("failed to start vinyl worker thread"); + cpipe_create(&worker->worker_pipe, name); + stailq_add_tail_entry(&scheduler->idle_workers, + worker, in_idle); } } static void vy_scheduler_stop_workers(struct vy_scheduler *scheduler) { - struct stailq task_queue; - stailq_create(&task_queue); - - assert(scheduler->is_worker_pool_running); - scheduler->is_worker_pool_running = false; - - /* Clear the input queue and wake up worker threads. */ - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_concat(&task_queue, &scheduler->input_queue); - pthread_cond_broadcast(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - - /* Wait for worker threads to exit. */ - for (int i = 0; i < scheduler->worker_pool_size; i++) - cord_join(&scheduler->worker_pool[i]); - ev_async_stop(scheduler->scheduler_loop, &scheduler->scheduler_async); - + assert(scheduler->worker_pool != NULL); + for (int i = 0; i < scheduler->worker_pool_size; i++) { + struct vy_worker *worker = &scheduler->worker_pool[i]; + cbus_stop_loop(&worker->worker_pipe); + cpipe_destroy(&worker->worker_pipe); + cord_join(&worker->cord); + } free(scheduler->worker_pool); scheduler->worker_pool = NULL; - - /* Abort all pending tasks. */ - struct vy_task *task, *next; - stailq_concat(&task_queue, &scheduler->output_queue); - stailq_foreach_entry_safe(task, next, &task_queue, link) { - if (task->ops->abort != NULL) - task->ops->abort(scheduler, task, true); - vy_task_delete(&scheduler->task_pool, task); - } } void @@ -325,18 +326,13 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads, if (scheduler->scheduler_fiber == NULL) panic("failed to allocate vinyl scheduler fiber"); - scheduler->scheduler_loop = loop(); fiber_cond_create(&scheduler->scheduler_cond); - ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb); scheduler->worker_pool_size = write_threads; mempool_create(&scheduler->task_pool, cord_slab_cache(), sizeof(struct vy_task)); - stailq_create(&scheduler->input_queue); - stailq_create(&scheduler->output_queue); - - tt_pthread_cond_init(&scheduler->worker_cond, NULL); - tt_pthread_mutex_init(&scheduler->mutex, NULL); + stailq_create(&scheduler->idle_workers); + stailq_create(&scheduler->processed_tasks); vy_dump_heap_create(&scheduler->dump_heap); vy_compact_heap_create(&scheduler->compact_heap); @@ -356,12 +352,9 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler) fiber_cond_signal(&scheduler->dump_cond); fiber_cond_signal(&scheduler->scheduler_cond); - if (scheduler->is_worker_pool_running) + if (scheduler->worker_pool != NULL) vy_scheduler_stop_workers(scheduler); - tt_pthread_cond_destroy(&scheduler->worker_cond); - tt_pthread_mutex_destroy(&scheduler->mutex); - diag_destroy(&scheduler->diag); mempool_destroy(&scheduler->task_pool); fiber_cond_destroy(&scheduler->dump_cond); @@ -660,8 +653,10 @@ vy_run_discard(struct vy_run *run) } static int -vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task) +vy_task_write_run(struct vy_task *task) { + enum { YIELD_LOOPS = 32 }; + struct vy_lsm *lsm = task->lsm; struct vy_stmt_stream *wi = task->wi; @@ -683,6 +678,7 @@ vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task) if (wi->iface->start(wi) != 0) goto fail_abort_writer; int rc; + int loops = 0; struct tuple *stmt = NULL; while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) { inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE); @@ -693,7 +689,9 @@ vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task) if (rc != 0) break; - if (!scheduler->is_worker_pool_running) { + if (++loops % YIELD_LOOPS == 0) + fiber_sleep(0); + if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); rc = -1; break; @@ -715,14 +713,15 @@ vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task) } static int -vy_task_dump_execute(struct vy_scheduler *scheduler, struct vy_task *task) +vy_task_dump_execute(struct vy_task *task) { - return vy_task_write_run(scheduler, task); + return vy_task_write_run(task); } static int -vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task) +vy_task_dump_complete(struct vy_task *task) { + struct vy_scheduler *scheduler = task->scheduler; struct vy_lsm *lsm = task->lsm; struct vy_run *new_run = task->new_run; int64_t dump_lsn = new_run->dump_lsn; @@ -888,9 +887,9 @@ vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task) } static void -vy_task_dump_abort(struct vy_scheduler *scheduler, struct vy_task *task, - bool in_shutdown) +vy_task_dump_abort(struct vy_task *task) { + struct vy_scheduler *scheduler = task->scheduler; struct vy_lsm *lsm = task->lsm; assert(lsm->is_dumping); @@ -902,17 +901,13 @@ vy_task_dump_abort(struct vy_scheduler *scheduler, struct vy_task *task, * It's no use alerting the user if the server is * shutting down or the LSM tree was dropped. */ - if (!in_shutdown && !lsm->is_dropped) { + if (!lsm->is_dropped) { struct error *e = diag_last_error(&task->diag); error_log(e); say_error("%s: dump failed", vy_lsm_name(lsm)); } - /* The metadata log is unavailable on shutdown. */ - if (!in_shutdown) - vy_run_discard(task->new_run); - else - vy_run_unref(task->new_run); + vy_run_discard(task->new_run); lsm->is_dumping = false; vy_scheduler_update_lsm(scheduler, lsm); @@ -997,8 +992,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, return 0; } - struct vy_task *task = vy_task_new(&scheduler->task_pool, - lsm, &dump_ops); + struct vy_task *task = vy_task_new(scheduler, lsm, &dump_ops); if (task == NULL) goto err; @@ -1053,7 +1047,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, err_wi: vy_run_discard(new_run); err_run: - vy_task_delete(&scheduler->task_pool, task); + vy_task_delete(task); err: diag_log(); say_error("%s: could not start dump", vy_lsm_name(lsm)); @@ -1061,14 +1055,15 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, } static int -vy_task_compact_execute(struct vy_scheduler *scheduler, struct vy_task *task) +vy_task_compact_execute(struct vy_task *task) { - return vy_task_write_run(scheduler, task); + return vy_task_write_run(task); } static int -vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task) +vy_task_compact_complete(struct vy_task *task) { + struct vy_scheduler *scheduler = task->scheduler; struct vy_lsm *lsm = task->lsm; struct vy_range *range = task->range; struct vy_run *new_run = task->new_run; @@ -1213,9 +1208,9 @@ vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task) } static void -vy_task_compact_abort(struct vy_scheduler *scheduler, struct vy_task *task, - bool in_shutdown) +vy_task_compact_abort(struct vy_task *task) { + struct vy_scheduler *scheduler = task->scheduler; struct vy_lsm *lsm = task->lsm; struct vy_range *range = task->range; @@ -1226,18 +1221,14 @@ vy_task_compact_abort(struct vy_scheduler *scheduler, struct vy_task *task, * It's no use alerting the user if the server is * shutting down or the LSM tree was dropped. */ - if (!in_shutdown && !lsm->is_dropped) { + if (!lsm->is_dropped) { struct error *e = diag_last_error(&task->diag); error_log(e); say_error("%s: failed to compact range %s", vy_lsm_name(lsm), vy_range_str(range)); } - /* The metadata log is unavailable on shutdown. */ - if (!in_shutdown) - vy_run_discard(task->new_run); - else - vy_run_unref(task->new_run); + vy_run_discard(task->new_run); assert(range->heap_node.pos == UINT32_MAX); vy_range_heap_insert(&lsm->range_heap, &range->heap_node); @@ -1270,8 +1261,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, return 0; } - struct vy_task *task = vy_task_new(&scheduler->task_pool, - lsm, &compact_ops); + struct vy_task *task = vy_task_new(scheduler, lsm, &compact_ops); if (task == NULL) goto err_task; @@ -1330,7 +1320,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, err_wi: vy_run_discard(new_run); err_run: - vy_task_delete(&scheduler->task_pool, task); + vy_task_delete(task); err_task: diag_log(); say_error("%s: could not start compacting range %s: %s", @@ -1338,6 +1328,62 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, return -1; } +/** + * Fiber function that actually executes a vinyl task. + * After finishing a task, it sends it back to tx. + */ +static int +vy_task_f(va_list va) +{ + struct vy_task *task = va_arg(va, struct vy_task *); + if (task->ops->execute(task) != 0) { + struct diag *diag = diag_get(); + assert(!diag_is_empty(diag)); + task->is_failed = true; + diag_move(diag, &task->diag); + } + cmsg_init(&task->cmsg, vy_task_complete_route); + cpipe_push(&task->worker->tx_pipe, &task->cmsg); + task->fiber = NULL; + return 0; +} + +/** + * Callback invoked by a worker thread upon receiving a task. + * It schedules a fiber which actually executes the task, so + * as not to block the event loop. + */ +static void +vy_task_execute_f(struct cmsg *cmsg) +{ + struct vy_task *task = container_of(cmsg, struct vy_task, cmsg); + assert(task->fiber == NULL); + task->fiber = fiber_new("task", vy_task_f); + if (task->fiber == NULL) { + task->is_failed = true; + diag_move(diag_get(), &task->diag); + cmsg_init(&task->cmsg, vy_task_complete_route); + cpipe_push(&task->worker->tx_pipe, &task->cmsg); + } else { + fiber_start(task->fiber, task); + } +} + +/** + * Callback invoked by the tx thread upon receiving an executed + * task from a worker thread. It adds the task to the processed + * task queue and wakes up the scheduler so that it can complete + * it. + */ +static void +vy_task_complete_f(struct cmsg *cmsg) +{ + struct vy_task *task = container_of(cmsg, struct vy_task, cmsg); + stailq_add_tail_entry(&task->scheduler->processed_tasks, + task, in_processed); + fiber_cond_signal(&task->scheduler->scheduler_cond); +} + /** * Create a task for dumping an LSM tree. The new task is returned * in @ptask. If there's no LSM tree that needs to be dumped @ptask @@ -1444,7 +1490,7 @@ vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask) if (*ptask != NULL) return 0; - if (scheduler->workers_available <= 1) { + if (scheduler->idle_worker_count <= 1) { /* * If all worker threads are busy doing compaction * when we run out of quota, ongoing transactions will @@ -1471,17 +1517,16 @@ vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask) } static int -vy_scheduler_complete_task(struct vy_scheduler *scheduler, - struct vy_task *task) +vy_task_complete(struct vy_task *task) { if (task->lsm->is_dropped) { if (task->ops->abort) - task->ops->abort(scheduler, task, false); + task->ops->abort(task); return 0; } struct diag *diag = &task->diag; - if (task->status != 0) { + if (task->is_failed) { assert(!diag_is_empty(diag)); goto fail; /* ->execute fialed */ } @@ -1491,7 +1536,7 @@ vy_scheduler_complete_task(struct vy_scheduler *scheduler, diag_move(diag_get(), diag); goto fail; }); if (task->ops->complete && - task->ops->complete(scheduler, task) != 0) { + task->ops->complete(task) != 0) { assert(!diag_is_empty(diag_get())); diag_move(diag_get(), diag); goto fail; @@ -1499,8 +1544,8 @@ vy_scheduler_complete_task(struct vy_scheduler *scheduler, return 0; fail: if (task->ops->abort) - task->ops->abort(scheduler, task, false); - diag_move(diag, &scheduler->diag); + task->ops->abort(task); + diag_move(diag, &task->scheduler->diag); return -1; } @@ -1524,26 +1569,26 @@ vy_scheduler_f(va_list va) vy_scheduler_start_workers(scheduler); while (scheduler->scheduler_fiber != NULL) { - struct stailq output_queue; + struct stailq processed_tasks; struct vy_task *task, *next; int tasks_failed = 0, tasks_done = 0; - bool was_empty; /* Get the list of processed tasks. */ - stailq_create(&output_queue); - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_concat(&output_queue, &scheduler->output_queue); - tt_pthread_mutex_unlock(&scheduler->mutex); + stailq_create(&processed_tasks); + stailq_concat(&processed_tasks, &scheduler->processed_tasks); /* Complete and delete all processed tasks. */ - stailq_foreach_entry_safe(task, next, &output_queue, link) { - if (vy_scheduler_complete_task(scheduler, task) != 0) + stailq_foreach_entry_safe(task, next, &processed_tasks, + in_processed) { + if (vy_task_complete(task) != 0) tasks_failed++; else tasks_done++; - vy_task_delete(&scheduler->task_pool, task); - scheduler->workers_available++; - assert(scheduler->workers_available <= + stailq_add_entry(&scheduler->idle_workers, + task->worker, in_idle); + vy_task_delete(task); + scheduler->idle_worker_count++; + assert(scheduler->idle_worker_count <= scheduler->worker_pool_size); } /* @@ -1557,7 +1602,7 @@ vy_scheduler_f(va_list va) * opens a time window for a worker to submit * a processed task and wake up the scheduler * (via scheduler_async). Hence we should go - * and recheck the output_queue in order not + * and recheck the processed_tasks in order not * to lose a wakeup event and hang for good. */ continue; @@ -1566,7 +1611,7 @@ vy_scheduler_f(va_list va) if (tasks_failed > 0) goto error; /* All worker threads are busy. */ - if (scheduler->workers_available == 0) + if (scheduler->idle_worker_count == 0) goto wait; /* Get a task to schedule. */ if (vy_schedule(scheduler, &task) != 0) @@ -1576,14 +1621,13 @@ vy_scheduler_f(va_list va) goto wait; /* Queue the task and notify workers if necessary. */ - tt_pthread_mutex_lock(&scheduler->mutex); - was_empty = stailq_empty(&scheduler->input_queue); - stailq_add_tail_entry(&scheduler->input_queue, task, link); - if (was_empty) - tt_pthread_cond_signal(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - - scheduler->workers_available--; + assert(!stailq_empty(&scheduler->idle_workers)); + task->worker = stailq_shift_entry(&scheduler->idle_workers, + struct vy_worker, in_idle); + scheduler->idle_worker_count--; + cmsg_init(&task->cmsg, vy_task_execute_route); + cpipe_push(&task->worker->worker_pipe, &task->cmsg); + fiber_reschedule(); continue; error: @@ -1619,40 +1663,17 @@ vy_scheduler_f(va_list va) return 0; } -static void * -vy_worker_f(void *arg) +static int +vy_worker_f(va_list ap) { - struct vy_scheduler *scheduler = arg; - struct vy_task *task = NULL; - - tt_pthread_mutex_lock(&scheduler->mutex); - while (scheduler->is_worker_pool_running) { - /* Wait for a task */ - if (stailq_empty(&scheduler->input_queue)) { - /* Wake scheduler up if there are no more tasks */ - ev_async_send(scheduler->scheduler_loop, - &scheduler->scheduler_async); - tt_pthread_cond_wait(&scheduler->worker_cond, - &scheduler->mutex); - continue; - } - task = stailq_shift_entry(&scheduler->input_queue, - struct vy_task, link); - tt_pthread_mutex_unlock(&scheduler->mutex); - assert(task != NULL); - - /* Execute task */ - task->status = task->ops->execute(scheduler, task); - if (task->status != 0) { - struct diag *diag = diag_get(); - assert(!diag_is_empty(diag)); - diag_move(diag, &task->diag); - } - - /* Return processed task to scheduler */ - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_add_tail_entry(&scheduler->output_queue, task, link); - } - tt_pthread_mutex_unlock(&scheduler->mutex); - return NULL; + struct vy_worker *worker = va_arg(ap, struct vy_worker *); + struct cbus_endpoint endpoint; + + cpipe_create(&worker->tx_pipe, "tx"); + cbus_endpoint_create(&endpoint, cord_name(&worker->cord), + fiber_schedule_cb, fiber()); + cbus_loop(&endpoint); + cbus_endpoint_destroy(&endpoint, cbus_process); + cpipe_destroy(&worker->tx_pipe); + return 0; } diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h index 777756c0b0eb16436d9150bce5f2c35f84122ef8..deefacd7c10511866cb8494fb0874ffabc8b73c6 100644 --- a/src/box/vy_scheduler.h +++ b/src/box/vy_scheduler.h @@ -42,16 +42,15 @@ #define HEAP_FORWARD_DECLARATION #include "salad/heap.h" #include "salad/stailq.h" -#include "tt_pthread.h" #if defined(__cplusplus) extern "C" { #endif /* defined(__cplusplus) */ -struct cord; struct fiber; struct vy_lsm; struct vy_run_env; +struct vy_worker; struct vy_scheduler; typedef void @@ -61,41 +60,23 @@ typedef void struct vy_scheduler { /** Scheduler fiber. */ struct fiber *scheduler_fiber; - /** Scheduler event loop. */ - struct ev_loop *scheduler_loop; /** Used to wake up the scheduler fiber from TX. */ struct fiber_cond scheduler_cond; - /** Used to wake up the scheduler from a worker thread. */ - struct ev_async scheduler_async; /** * Array of worker threads used for performing * dump/compaction tasks. */ - struct cord *worker_pool; - /** Set if the worker threads are running. */ - bool is_worker_pool_running; + struct vy_worker *worker_pool; /** Total number of worker threads. */ int worker_pool_size; /** Number worker threads that are currently idle. */ - int workers_available; + int idle_worker_count; + /** List of idle workers, linked by vy_worker::in_idle. */ + struct stailq idle_workers; /** Memory pool used for allocating vy_task objects. */ struct mempool task_pool; - /** Queue of pending tasks, linked by vy_task::link. */ - struct stailq input_queue; - /** Queue of processed tasks, linked by vy_task::link. */ - struct stailq output_queue; - /** - * Signaled to wake up a worker when there is - * a pending task in the input queue. Also used - * to stop worker threads on shutdown. - */ - pthread_cond_t worker_cond; - /** - * Mutex protecting input and output queues and - * the condition variable used to wake up worker - * threads. - */ - pthread_mutex_t mutex; + /** Queue of processed tasks, linked by vy_task::in_processed. */ + struct stailq processed_tasks; /** * Heap of LSM trees, ordered by dump priority, * linked by vy_lsm::in_dump.