diff --git a/src/box/recovery.cc b/src/box/recovery.cc index b2868fc64d0a33707e5b750ba88a7df4f43126f5..0cfb9cfeb334880f39f1805a76d6c6a9166dfc02 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -408,7 +408,9 @@ recover_remaining_wals(struct recovery_state *r) } while (1) { - current_vclock = vclockset_isearch(&r->wal_dir.index, &r->vclock); + current_vclock = vclockset_match(&r->wal_dir.index, + &r->vclock, + r->wal_dir.panic_if_error); if (current_vclock == NULL) break; /* No more WALs */ diff --git a/src/box/vclock.c b/src/box/vclock.c index 7e564fb578a6cac36da099bab1d149112005ed4d..a01a21dbd49887f584ac6b862c0831c09afb8a28 100644 --- a/src/box/vclock.c +++ b/src/box/vclock.c @@ -191,6 +191,12 @@ static int vclockset_node_compare(const struct vclock *a, const struct vclock *b) { int res = vclock_compare(a, b); + /* + * In a vclock set, we do not allow vclock objects which + * are note strictly ordered. + * See also xdir_scan(), in which we check & skip + * duplicate vclocks. + */ if (res == VCLOCK_ORDER_UNDEFINED) return 0; return res; diff --git a/src/box/vclock.h b/src/box/vclock.h index c041215f989262875f9fe63817339e9a7b539fc3..ae3a4b8a5167ff3f08c6fac6be25c2623ced130b 100644 --- a/src/box/vclock.h +++ b/src/box/vclock.h @@ -181,21 +181,39 @@ typedef rb_tree(struct vclock) vclockset_t; rb_proto(, vclockset_, vclockset_t, struct vclock); /** - * @brief Inclusive search - * @param set - * @param key + * A proximity search in a set of vclock objects. + * + * The set is normally the index of vclocks in the binary + * log files of the current directory. The task of the search is + * to find the first log, + * * @return a vclock that <= than \a key */ static inline struct vclock * -vclockset_isearch(vclockset_t *set, struct vclock *key) +vclockset_match(vclockset_t *set, struct vclock *key, + bool panic_if_error) { - struct vclock *res = vclockset_psearch(set, key); - while (res != NULL) { - if (vclock_compare(res, key) <= 0) - return res; - res = vclockset_prev(set, res); + struct vclock *match = vclockset_psearch(set, key); + /** + * vclockset comparator returns 0 for + * incomparable keys. So the match doesn't have to be + * strictly preceding the search key, it may be + * incomparable. If this is the case, unwind until + * we get to a key which is strictly below the search + * pattern. + */ + while (match != NULL) { + if (vclock_compare(match, key) <= 0) + return match; + /* The order is undefined, try the previous vclock. */ + match = vclockset_prev(set, match); } - return NULL; + /* + * There is no xlog which is strictly less than the search + * pattern. Return the fist successor log - it is either + * strictly greater, or incomparable with the key. + */ + return panic_if_error ? NULL: vclockset_first(set); } #if defined(__cplusplus) diff --git a/test/unit/vclock.cc b/test/unit/vclock.cc index 172d2637481f0912643acfed1a0314315ebfe5e1..282a88cac8e8d4d63b2030c4e42f6e333d11b0df 100644 --- a/test/unit/vclock.cc +++ b/test/unit/vclock.cc @@ -231,7 +231,7 @@ test_isearch() } int64_t check = *(query + NODE_N); - struct vclock *res = vclockset_isearch(&set, &vclock); + struct vclock *res = vclockset_match(&set, &vclock, true); int64_t value = res != NULL ? vclock_signature(res) : INT64_MAX; is(value, check, "query #%d", q + 1); }