Skip to content
Snippets Groups Projects
Commit ce1a41b1 authored by Sulverus's avatar Sulverus
Browse files

Merge branch 'master' into gh-668

parents fb4253c6 da4f159a
No related branches found
No related tags found
No related merge requests found
......@@ -100,9 +100,11 @@ lbox_info_vclock(struct lua_State *L)
lua_createtable(L, 0, vclock_size(&recovery->vclock));
/* Request compact output flow */
luaL_setmaphint(L, -1);
vclock_foreach(&recovery->vclock, it) {
lua_pushinteger(L, it.id);
luaL_pushuint64(L, it.lsn);
struct vclock_iterator it;
vclock_iterator_init(&it, &recovery->vclock);
vclock_foreach(&it, server) {
lua_pushinteger(L, server.id);
luaL_pushuint64(L, server.lsn);
lua_settable(L, -3);
}
......
......@@ -966,7 +966,7 @@ wal_write(struct recovery_state *r, struct xrow_header *row)
*/
fill_lsn(r, row);
if (r->wal_mode == WAL_NONE)
return vclock_sum(&r->vclock);
return vclock_signature(&r->vclock);
ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
......@@ -1002,7 +1002,7 @@ wal_write(struct recovery_state *r, struct xrow_header *row)
fiber_set_cancellable(cancellable);
if (req->res == -1)
return -1;
return vclock_sum(&r->vclock);
return vclock_signature(&r->vclock);
}
/* }}} */
......
......@@ -45,8 +45,10 @@ vclock_follow(struct vclock *vclock, uint32_t server_id, int64_t lsn)
(unsigned) server_id,
(long long) prev_lsn, (long long) lsn);
}
vclock->signature += lsn - ((prev_lsn >= 0) ? prev_lsn : 0);
/* Easier add each time than check. */
vclock_add_server_nothrow(vclock, server_id);
vclock->lsn[server_id] = lsn;
vclock->signature += lsn - prev_lsn;
return prev_lsn;
}
......@@ -95,9 +97,11 @@ vclock_to_string(const struct vclock *vclock)
return NULL;
const char *sep = "";
vclock_foreach(vclock, it) {
if (rsnprintf(&buf, &pos, &end, "%s%u: %lld", sep, it.id,
(long long) it.lsn) != 0)
struct vclock_iterator it;
vclock_iterator_init(&it, vclock);
vclock_foreach(&it, server) {
if (rsnprintf(&buf, &pos, &end, "%s%u: %lld", sep,
server.id, (long long) server.lsn) != 0)
return NULL;
sep = ", ";
}
......@@ -156,8 +160,9 @@ vclock_from_string(struct vclock *vclock, const char *str)
errno = 0;
lsn = strtoll(p, (char **) &p, 10);
if (errno != 0 || lsn < 0 || lsn > INT64_MAX ||
vclock->lsn[server_id] != -1)
vclock_has(vclock, server_id))
goto error;
vclock_add_server_nothrow(vclock, server_id);
vclock->lsn[server_id] = lsn;
goto comma;
}
......@@ -192,8 +197,8 @@ vclockset_node_compare(const struct vclock *a, const struct vclock *b)
{
int res = vclock_compare(a, b);
/*
* In a vclock set, we do not allow vclock objects which
* are note strictly ordered.
* In a vclock set, we do not allow clocks which are not
* strictly ordered.
* See also xdir_scan(), in which we check & skip
* duplicate vclocks.
*/
......
......@@ -38,6 +38,8 @@
#define RB_COMPACT 1
#include <third_party/rb.h>
#include "bit/bit.h"
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
......@@ -46,6 +48,9 @@ enum { VCLOCK_MAX = 16 };
/** Cluster vector clock */
struct vclock {
/** Map of used components in lsn array */
unsigned int map;
/** Sum of all components of vclock. */
int64_t signature;
int64_t lsn[VCLOCK_MAX];
/** To order binary logs by vector clock. */
......@@ -58,22 +63,45 @@ struct vclock_c {
int64_t lsn;
};
#define vclock_foreach(vclock, var) \
for (struct vclock_c (var) = {0, 0}; \
(var).id < VCLOCK_MAX; (var).id++) \
if (((var).lsn = (vclock)->lsn[(var).id]) >= 0)
struct vclock_iterator
{
struct bit_iterator it;
const struct vclock *vclock;
};
static inline void
vclock_iterator_init(struct vclock_iterator *it, const struct vclock *vclock)
{
it->vclock = vclock;
bit_iterator_init(&it->it, &vclock->map, sizeof(vclock->map), true);
}
static inline struct vclock_c
vclock_iterator_next(struct vclock_iterator *it)
{
struct vclock_c c;
size_t id = bit_iterator_next(&it->it);
c.id = id == SIZE_MAX ? (int) VCLOCK_MAX : id;
if (c.id < VCLOCK_MAX)
c.lsn = it->vclock->lsn[c.id];
return c;
}
#define vclock_foreach(it, var) \
for (struct vclock_c (var) = vclock_iterator_next(it); \
(var).id < VCLOCK_MAX; (var) = vclock_iterator_next(it))
static inline void
vclock_create(struct vclock *vclock)
{
memset(vclock, 0xff, sizeof(*vclock));
vclock->signature = 0;
memset(vclock, 0, sizeof(*vclock));
}
static inline bool
vclock_has(const struct vclock *vclock, uint32_t server_id)
{
return server_id < VCLOCK_MAX && vclock->lsn[server_id] >= 0;
return server_id < VCLOCK_MAX && (vclock->map & (1 << server_id));
}
static inline int64_t
......@@ -99,17 +127,16 @@ vclock_copy(struct vclock *dst, const struct vclock *src)
static inline uint32_t
vclock_size(const struct vclock *vclock)
{
int32_t size = 0;
vclock_foreach(vclock, pair)
++size;
return size;
return __builtin_popcount(vclock->map);
}
static inline int64_t
vclock_sum(const struct vclock *vclock)
{
int64_t sum = 0;
vclock_foreach(vclock, server)
struct vclock_iterator it;
vclock_iterator_init(&it, vclock);
vclock_foreach(&it, server)
sum += server.lsn;
return sum;
}
......@@ -120,6 +147,12 @@ vclock_signature(const struct vclock *vclock)
return vclock->signature;
}
static inline void
vclock_add_server_nothrow(struct vclock *vclock, uint32_t server_id)
{
vclock->map |= 1 << server_id;
}
int64_t
vclock_follow(struct vclock *vclock, uint32_t server_id, int64_t lsn);
......@@ -159,23 +192,19 @@ static inline int
vclock_compare(const struct vclock *a, const struct vclock *b)
{
bool le = true, ge = true;
for (uint32_t server_id = 0; server_id < VCLOCK_MAX; server_id++) {
unsigned int map = a->map | b->map;
struct bit_iterator it;
bit_iterator_init(&it, &map, sizeof(map), true);
for (size_t server_id = bit_iterator_next(&it); server_id < VCLOCK_MAX;
server_id = bit_iterator_next(&it)) {
int64_t lsn_a = a->lsn[server_id];
int64_t lsn_b = b->lsn[server_id];
if (lsn_a > 0 || lsn_b > 0) {
/*
* At least one of the clocks must have
* events for this server id. The case
* when one of the clocks lists the server
* as "present" and another doesn't yet
* know about is possible when reading
* and relaying rows to a replica.
*/
le = le && lsn_a <= lsn_b;
ge = ge && lsn_a >= lsn_b;
if (!ge && !le)
return VCLOCK_ORDER_UNDEFINED;
}
le = le && lsn_a <= lsn_b;
ge = ge && lsn_a >= lsn_b;
if (!ge && !le)
return VCLOCK_ORDER_UNDEFINED;
}
if (ge && !le)
return 1;
......@@ -255,14 +284,15 @@ vclock_add_server(struct vclock *vclock, uint32_t server_id)
if (server_id >= VCLOCK_MAX)
tnt_raise(ClientError, ER_REPLICA_MAX, server_id);
assert(! vclock_has(vclock, server_id));
vclock->lsn[server_id] = 0;
vclock_add_server_nothrow(vclock, server_id);
}
static inline void
vclock_del_server(struct vclock *vclock, uint32_t server_id)
{
assert(vclock_has(vclock, server_id));
vclock->lsn[server_id] = -1;
vclock->lsn[server_id] = 0;
vclock->map &= ~(1 << server_id);
vclock->signature = vclock_sum(vclock);
}
......
......@@ -276,7 +276,9 @@ xrow_encode_subscribe(struct xrow_header *row,
data = xrow_encode_uuid(data, server_uuid);
data = mp_encode_uint(data, IPROTO_VCLOCK);
data = mp_encode_map(data, cluster_size);
vclock_foreach(vclock, server) {
struct vclock_iterator it;
vclock_iterator_init(&it, vclock);
vclock_foreach(&it, server) {
data = mp_encode_uint(data, server.id);
data = mp_encode_uint(data, server.lsn);
}
......@@ -289,7 +291,7 @@ xrow_encode_subscribe(struct xrow_header *row,
void
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *cluster_uuid,
struct tt_uuid *server_uuid, struct vclock *vclock)
struct tt_uuid *server_uuid, struct vclock *vclock)
{
if (row->bodycnt == 0)
tnt_raise(ClientError, ER_INVALID_MSGPACK, "request body");
......@@ -351,7 +353,9 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *cluster_uuid,
if (mp_typeof(*d) != MP_UINT)
goto map_error;
int64_t lsn = (int64_t) mp_decode_uint(&d);
vclock_follow(vclock, id, lsn);
vclock_add_server(vclock, id);
if (lsn > 0)
vclock_follow(vclock, id, lsn);
}
}
......@@ -389,7 +393,9 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock)
data = mp_encode_map(data, 1);
data = mp_encode_uint(data, IPROTO_VCLOCK);
data = mp_encode_map(data, cluster_size);
vclock_foreach(vclock, server) {
struct vclock_iterator it;
vclock_iterator_init(&it, vclock);
vclock_foreach(&it, server) {
data = mp_encode_uint(data, server.id);
data = mp_encode_uint(data, server.lsn);
}
......
......@@ -447,17 +447,12 @@ bit_iterator_init(struct bit_iterator *it, const void *data, size_t size,
const char *e = it->next + size % sizeof(ITER_UINT);
if (bit_likely(it->next == e)) {
it->word = *(ITER_UINT *) it->next;
it->word ^= it->word_xor;
it->next += sizeof(ITER_UINT);
return;
}
it->word = it->word_xor;
char *w = (char *) &it->word;
while (it->next < e) {
*w = *it->next;
it->next++;
w++;
} else {
it->word = it->word_xor;
char *w = (char *) &it->word;
while (it->next < e)
*w++ = *it->next++;
}
it->word ^= it->word_xor;
}
......@@ -483,7 +478,7 @@ bit_iterator_next(struct bit_iterator *it)
it->next += sizeof(ITER_UINT);
}
/* Find the position of a first traling bit in the current word */
/* Find the position of a first trailing bit in the current word */
int bit = ITER_CTZ(it->word);
/* Remove the first trailing bit from the current word */
it->word &= it->word - 1;
......
......@@ -28,7 +28,6 @@
*/
extern "C" {
#include "unit.h"
#include "unit.h"
} /* extern "C" */
#include <stdarg.h>
......@@ -248,12 +247,17 @@ test_tostring_one(uint32_t count, const int64_t *lsns, const char *res)
struct vclock vclock;
vclock_create(&vclock);
for (uint32_t node_id = 0; node_id < count; node_id++) {
vclock.lsn[node_id] = lsns[node_id];
if (lsns[node_id] >= 0)
vclock_add_server(&vclock, node_id);
if (lsns[node_id] > 0)
vclock_follow(&vclock, node_id, lsns[node_id]);
}
char *str = vclock_to_string(&vclock);
int result = strcmp(str, res) == 0;
int result = strcmp(str, res);
if (result)
diag("\n!!!new result!!! %s\n", str);
free(str);
return result;
return !result;
}
#define test(xa, res) ({\
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment