From b7724150b3017af60b28e3491d971d9eab1d709e Mon Sep 17 00:00:00 2001 From: Alexandr Lyapunov <a.lyapunov@corp.mail.ru> Date: Fri, 16 Sep 2016 17:49:30 +0300 Subject: [PATCH] Splitted vy_run_iterator_search into _search_page and _search_in_page Also removed vlsn from vy_run_iterator_search --- src/box/vinyl.c | 193 +++++++++++++++++++++--------------------------- 1 file changed, 85 insertions(+), 108 deletions(-) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index fb2106d957..166c067c9b 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -5599,67 +5599,6 @@ vy_run_iterator_cmp_pos(struct vy_run_iterator_pos pos1, pos1.pos_in_page > pos2.pos_in_page; } -/** - * Specific middle wide position calculation for binary search - * Till possible, returns position of first record in page - * This behaviour allows to read keys from page index instead of disk - * untill necessary page was found - * @retval 0 success - * @retval -1 or memory or read error - * @retval 1 EOF (possible when page has no records, int bootstrap run) - */ -static int -vy_iterator_pos_mid(struct vy_run_iterator *itr, - struct vy_run_iterator_pos pos1, - struct vy_run_iterator_pos pos2, - struct vy_run_iterator_pos *result) -{ - assert(vy_run_iterator_cmp_pos(pos1, pos2) < 0); - if (pos2.page_no - pos1.page_no > 1) { - assert(pos1.pos_in_page == 0 && pos2.pos_in_page == 0); - result->page_no = - pos1.page_no + (pos2.page_no - pos1.page_no) / 2; - result->pos_in_page = 0; - return 0; - } - struct vy_page *page; - int rc = vy_run_iterator_load_page(itr, pos1.page_no, &page); - if (rc != 0) - return rc; - assert(pos1.page_no == pos2.page_no || pos2.pos_in_page == 0); - uint32_t diff = pos1.page_no == pos2.page_no ? - pos2.pos_in_page - pos1.pos_in_page : - page->count - pos1.pos_in_page; - result->page_no = pos1.page_no; - result->pos_in_page = pos1.pos_in_page + diff / 2; - return result->pos_in_page == page->count ? 1 : 0; -} - -/** - * Specific increment of middle wide position for binary search - * Actually does not do increment until search in page was started. - * @retval 0 success - * @retval -1 memory or read error - */ -static int -vy_iterator_pos_mid_next(struct vy_run_iterator *itr, - struct vy_run_iterator_pos mid, - struct vy_run_iterator_pos end, - struct vy_run_iterator_pos *result) -{ - if (end.page_no - mid.page_no > 1) { - *result = mid; - return 0; - } - struct vy_page *page; - int rc = vy_run_iterator_load_page(itr, mid.page_no, &page); - if (rc != 0) - return rc; - mid.pos_in_page++; - *result = mid.pos_in_page == page->count ? end : mid; - return 0; -} - /** * Read key and lsn by a given wide position. * For the first record in a page reads the result from the page @@ -5685,60 +5624,98 @@ vy_run_iterator_read(struct vy_run_iterator *itr, } /** - * Binary search in a run for the given key and lsn. + * Binary search in page index + * In terms of STL, makes lower_bound for EQ,GE,LT and upper_bound for GT,LE + * Additionally *equal_key argument is set to true if the found value is + * equal to given key (untouched otherwise) + */ +static void +vy_run_iterator_search_page(struct vy_run_iterator *itr, char *key, + uint32_t *page_no, bool *equal_key) +{ + uint32_t beg = 0; + uint32_t end = itr->run->info.count; + /* for upper bound we change zero comparison result to -1 */ + int zero_cmp = itr->order == VINYL_GT || itr->order == VINYL_LE ? -1 : 0; + while (beg != end) { + uint32_t mid = beg + (end - beg) / 2; + struct vy_page_info *page_info = + vy_run_page(itr->run, mid); + char *fnd_key = vy_run_min_key(itr->run, page_info); + int cmp = vy_tuple_compare(fnd_key, key, itr->index->key_def); + cmp = cmp ? cmp : zero_cmp; + *equal_key = *equal_key || cmp == 0; + if (cmp < 0) + beg = mid + 1; + else + end = mid; + } + *page_no = end; +} + +/** + * Binary search in page + * In terms of STL, makes lower_bound for EQ,GE,LT and upper_bound for GT,LE + * Additionally *equal_key argument is set to true if the found value is + * equal to given key (untouched otherwise) + */ +static void +vy_run_iterator_search_in_page(struct vy_run_iterator *itr, char *key, + struct vy_page *page, uint32_t *pos_in_page, + bool *equal_key) +{ + uint32_t beg = 0; + uint32_t end = page->count; + /* for upper bound we change zero comparison result to -1 */ + int zero_cmp = itr->order == VINYL_GT || itr->order == VINYL_LE ? -1 : 0; + while (beg != end) { + uint32_t mid = beg + (end - beg) / 2; + struct vy_tuple_info *info; + const char *fnd_key = vy_page_tuple(page, mid, &info); + int cmp = vy_tuple_compare(fnd_key, key, itr->index->key_def); + cmp = cmp ? cmp : zero_cmp; + *equal_key = *equal_key || cmp == 0; + if (cmp < 0) + beg = mid + 1; + else + end = mid; + } + *pos_in_page = end; +} + +/** + * Binary search in a run for the given key. + * In terms of STL, makes lower_bound for EQ,GE,LT and upper_bound for GT,LE * Resulting wide position is stored it *pos argument - * Note that run is sorted by key ASC and lsn DESC. - * Normally sets the position to first record that greater than given key or - * equal key and not greater lsn, i.e. - * (record.key > key || (record.key == key && record lsn <= lsn)), - * (!) but has a special case of order == VINYL_GT/VINYL_LE, - * when position is set to first record that greater than given key, i.e. - * (record.key > key), - * If that value was not found then position is set to end_pos (invalid pos) - * *equal_key is set to true if found value is equal to key of false otherwise - * Beware of: - * 1) VINYL_GT/VINYL_LE special case - * 2) search with partial key and lsn != INT64_MAX is meaningless - * and dangerous - * 3) if return false, the position was set to maximal lsn of the - * next key + * Additionally *equal_key argument is set to true if the found value is + * equal to given key (untouched otherwise) * * @retval 0 success * @retval -1 read or memory error */ static int -vy_run_iterator_search(struct vy_run_iterator *itr, char *key, int64_t vlsn, +vy_run_iterator_search(struct vy_run_iterator *itr, char *key, struct vy_run_iterator_pos *pos, bool *equal_key) { - struct vy_run_iterator_pos beg = {0, 0}; - struct vy_run_iterator_pos end = {itr->run->info.count, 0}; - *equal_key = false; - while (vy_run_iterator_cmp_pos(beg, end) != 0) { - struct vy_run_iterator_pos mid; - int rc = vy_iterator_pos_mid(itr, beg, end, &mid); - if (rc != 0) - return rc; - const char *fnd_key; - int64_t fnd_lsn; - rc = vy_run_iterator_read(itr, mid, &fnd_key, &fnd_lsn); - if (rc != 0) - return rc; - int cmp = vy_tuple_compare(fnd_key, key, itr->index->key_def); - bool is_equal_key = cmp == 0; - if (cmp == 0 && - (itr->order == VINYL_GT || itr->order == VINYL_LE)) { - cmp = -1; - } - cmp = cmp ? cmp : fnd_lsn > vlsn ? -1 : fnd_lsn < vlsn; - if (cmp < 0) { - if (vy_iterator_pos_mid_next(itr, mid, end, &beg) != 0) - return -1; - } else { - end = mid; - *equal_key = is_equal_key; - } + vy_run_iterator_search_page(itr, key, &pos->page_no, equal_key); + if (pos->page_no == 0) { + pos->pos_in_page = 0; + return 0; + } + pos->page_no--; + struct vy_page *page; + int rc = vy_run_iterator_load_page(itr, pos->page_no, &page); + if (rc != 0) + return rc; + bool equal_in_page = false; + vy_run_iterator_search_in_page(itr, key, page, &pos->pos_in_page, + &equal_in_page); + if (pos->pos_in_page == page->count) { + pos->page_no++; + pos->pos_in_page = 0; + } else { + *equal_key = equal_in_page; } - *pos = end; return 0; } @@ -5911,8 +5888,8 @@ vy_run_iterator_start(struct vy_run_iterator *itr) bool equal_found = false; int rc; if (vy_tuple_key_part(itr->key, 0) != NULL) { - rc = vy_run_iterator_search(itr, itr->key, INT64_MAX, - &itr->curr_pos, &equal_found); + rc = vy_run_iterator_search(itr, itr->key, &itr->curr_pos, + &equal_found); if (rc < 0) return rc; } else if (itr->order == VINYL_LE || itr->order == VINYL_LT) { -- GitLab