diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 886b824022d3a84b7d373488aa26827e093969c4..6e80b2268423804390af39a712d701e62712ba28 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -206,7 +206,7 @@ struct swim_member { */ struct tt_uuid uuid; /** - * Cached hash of the uuid for the members table lookups. + * Cached hash of the uuid for the member table lookups. */ uint32_t hash; /** @@ -406,7 +406,8 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr, diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node"); return NULL; } - swim_ev_timer_start(loop(), &swim->round_tick); + if (mh_size(swim->members) > 1) + swim_ev_timer_start(loop(), &swim->round_tick); say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim), swim_uuid_str(&member->uuid), mh_size(swim->members)); return member; @@ -448,8 +449,9 @@ swim_new_round(struct swim *swim) swim_fd(swim)); return 0; } + /* -1 for self. */ say_verbose("SWIM %d: start a new round with %d members", swim_fd(swim), - size); + size - 1); swim_shuffle_members(swim); rlist_create(&swim->round_queue); for (int i = 0; i < size; ++i) { @@ -550,7 +552,9 @@ swim_begin_step(struct ev_loop *loop, struct ev_timer *t, int events) (void) events; (void) loop; struct swim *swim = (struct swim *) t->data; - if (rlist_empty(&swim->round_queue) && swim_new_round(swim) != 0) { + if (! rlist_empty(&swim->round_queue)) { + say_verbose("SWIM %d: continue the round", swim_fd(swim)); + } else if (swim_new_round(swim) != 0) { diag_log(); return; } @@ -685,6 +689,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def, static int swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) { + say_verbose("SWIM %d: process anti-entropy", swim_fd(swim)); const char *prefix = "invalid anti-entropy message:"; uint32_t size; if (swim_decode_array(pos, end, &size, prefix, "root") != 0) @@ -737,8 +742,6 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, goto error; switch(key) { case SWIM_ANTI_ENTROPY: - say_verbose("SWIM %d: process anti-entropy", - swim_fd(swim)); if (swim_process_anti_entropy(swim, &pos, end) != 0) goto error; break; @@ -771,7 +774,8 @@ swim_new(void) swim_ev_timer_init(&swim->round_tick, swim_begin_step, HEARTBEAT_RATE_DEFAULT, 0); swim->round_tick.data = (void *) swim; - swim_task_create(&swim->round_step_task, swim_complete_step, NULL); + swim_task_create(&swim->round_step_task, swim_complete_step, NULL, + "round packet"); swim_scheduler_create(&swim->scheduler, swim_on_input); return swim; } @@ -856,11 +860,12 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, * specified. */ addr = swim->scheduler.transport.addr; + } else { + addr = swim->self->addr; } if (swim->round_tick.at != heartbeat_rate && heartbeat_rate > 0) swim_ev_timer_set(&swim->round_tick, heartbeat_rate, 0); - swim_ev_timer_start(loop(), &swim->round_tick); swim_update_member_addr(swim, swim->self, &addr); int rc = swim_update_member_uuid(swim, swim->self, uuid); /* Reserved above. */ @@ -903,7 +908,7 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid) assert(swim_is_configured(swim)); const char *prefix = "swim.remove_member:"; if (uuid == NULL || tt_uuid_is_nil(uuid)) { - diag_set(SwimError, "%s UUiD is mandatory", prefix); + diag_set(SwimError, "%s UUID is mandatory", prefix); return -1; } struct swim_member *member = swim_find_member(swim, uuid); diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 9c16d1ad308943d15062e386f753ee32b48fa0e0..015968a0dec42b54ec9758d9d2168d9a6b66495e 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -57,11 +57,12 @@ swim_packet_create(struct swim_packet *packet) void swim_task_create(struct swim_task *task, swim_task_f complete, - swim_task_f cancel) + swim_task_f cancel, const char *desc) { memset(task, 0, sizeof(*task)); task->complete = complete; task->cancel = cancel; + task->desc = desc; swim_packet_create(&task->packet); rlist_create(&task->in_queue_output); } @@ -170,9 +171,9 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events) struct swim_task *task = rlist_shift_entry(&scheduler->queue_output, struct swim_task, in_queue_output); - say_verbose("SWIM %d: send to %s", swim_scheduler_fd(scheduler), - sio_strfaddr((struct sockaddr *) &task->dst, - sizeof(task->dst))); + say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler), + task->desc, sio_strfaddr((struct sockaddr *) &task->dst, + sizeof(task->dst))); struct swim_meta_header_bin header; swim_meta_header_bin_create(&header, &scheduler->transport.addr); memcpy(task->packet.meta, &header, sizeof(header)); diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index c1e7d3cc6c48cfcbcb3c5a00052f1170d63d2bb6..fe8e54d112339a8a93a2e73852dc2b1ae2f60023 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -202,6 +202,10 @@ struct swim_task { struct sockaddr_in dst; /** Place in a queue of tasks. */ struct rlist in_queue_output; + /** + * A short description of the packet content. For logging. + */ + const char *desc; }; /** @@ -214,7 +218,7 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst, /** Initialize the task, without scheduling. */ void swim_task_create(struct swim_task *task, swim_task_f complete, - swim_task_f cancel); + swim_task_f cancel, const char *desc); /** Destroy the task, pop from the queue. */ static inline void diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index eae2abbc18df0bc1c543c5f7f9216b81632dd0f0..dbc446e4032a01671851d18589c37f416a3573e8 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -289,6 +289,7 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos, if (swim_decode_map(pos, end, &size, prefix, "root") != 0) return -1; memset(def, 0, sizeof(*def)); + def->src.sin_family = AF_INET; for (uint32_t i = 0; i < size; ++i) { uint64_t key; if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0) diff --git a/test/unit/swim.c b/test/unit/swim.c index 29e9eb4f45327729c30413dd5e608d87496c2171..921fc8f073692a457aa3efa7a71ba572d2382a30 100644 --- a/test/unit/swim.c +++ b/test/unit/swim.c @@ -109,7 +109,7 @@ swim_test_uuid_update(void) static void swim_test_cfg(void) { - swim_start_test(15); + swim_start_test(16); struct swim *s = swim_new(); assert(s != NULL); @@ -123,6 +123,9 @@ swim_test_cfg(void) is(swim_cfg(s, uri, -1, &uuid), 0, "configured first time"); is(swim_cfg(s, NULL, -1, NULL), 0, "second time can omit URI, UUID"); is(swim_cfg(s, NULL, 2, NULL), 0, "hearbeat is dynamic"); + const char *self_uri = swim_member_uri(swim_self(s)); + is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\ + "URI"); struct swim *s2 = swim_new(); assert(s2 != NULL); diff --git a/test/unit/swim.result b/test/unit/swim.result index e71d6cfc2b8e40ff3f722be71171141384802718..e8991d8d827baeb04084d0d6a7864400bb5f399f 100644 --- a/test/unit/swim.result +++ b/test/unit/swim.result @@ -19,7 +19,7 @@ ok 2 - subtests ok 3 - subtests *** swim_test_uuid_update: done *** *** swim_test_cfg *** - 1..15 + 1..16 ok 1 - first cfg failed - no URI ok 2 - diag says 'mandatory' ok 3 - first cfg failed - no UUID @@ -27,14 +27,15 @@ ok 3 - subtests ok 5 - configured first time ok 6 - second time can omit URI, UUID ok 7 - hearbeat is dynamic - ok 8 - can not use invalid URI - ok 9 - diag says 'invalid uri' - ok 10 - can not use domain names - ok 11 - diag says 'invalid uri' - ok 12 - UNIX sockets are not supported - ok 13 - diag says 'only IP' - ok 14 - can not bind to an occupied port - ok 15 - diag says 'bind' + ok 8 - URI is unchanged after recfg with NULL URI + ok 9 - can not use invalid URI + ok 10 - diag says 'invalid uri' + ok 11 - can not use domain names + ok 12 - diag says 'invalid uri' + ok 13 - UNIX sockets are not supported + ok 14 - diag says 'only IP' + ok 15 - can not bind to an occupied port + ok 16 - diag says 'bind' ok 4 - subtests *** swim_test_cfg: done *** *** swim_test_add_remove *** diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c index 950784aecc1a55988ff3bd4bfdf3547bef73ecc4..ee1fcdbb7ca9f8c52a639917dd1151fdb8b6dc0a 100644 --- a/test/unit/swim_test_ev.c +++ b/test/unit/swim_test_ev.c @@ -289,12 +289,12 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *base) void swim_do_loop_step(struct ev_loop *loop) { - say_verbose("Loop watch %f", watch); struct swim_event *next_e, *e = event_heap_top(&event_heap); if (e != NULL) { assert(e->deadline >= watch); /* Multiple events can have the same deadline. */ watch = e->deadline; + say_verbose("Loop watch %f", watch); do { e->process(e, loop); next_e = event_heap_top(&event_heap); diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index efae412f84e6de8a3c40a14b3ccc82722c3bdd05..d60344c223f6bf8f6608b4462c71cc9d7171f296 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -79,6 +79,13 @@ swim_test_packet_new(const char *data, int size, const struct sockaddr_in *src, return p; } +/** Free packet memory. */ +static inline void +swim_test_packet_delete(struct swim_test_packet *p) +{ + free(p); +} + /** Fake file descriptor. */ struct swim_fd { /** File descriptor number visible to libev. */ @@ -123,9 +130,9 @@ swim_fd_close(struct swim_fd *fd) { struct swim_test_packet *i, *tmp; rlist_foreach_entry_safe(i, &fd->recv_queue, in_queue, tmp) - free(i); + swim_test_packet_delete(i); rlist_foreach_entry_safe(i, &fd->send_queue, in_queue, tmp) - free(i); + swim_test_packet_delete(i); rlist_del_entry(fd, in_opened); } @@ -189,7 +196,7 @@ swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size, *addr_size = sizeof(p->src); ssize_t result = MIN((size_t) p->size, size); memcpy(buffer, p->data, result); - free(p); + swim_test_packet_delete(p); return result; } diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c index 73f8db40fdabcd2d2a2a33096b1e6d376cce3d48..a92e5523399dc69391ad568e32e4684bc2e1d637 100644 --- a/test/unit/swim_test_utils.c +++ b/test/unit/swim_test_utils.c @@ -31,6 +31,7 @@ #include "swim_test_utils.h" #include "swim_test_ev.h" #include "swim/swim.h" +#include "swim/swim_ev.h" #include "uuid/tt_uuid.h" #include "trivia/util.h" #include "fiber.h" @@ -111,7 +112,7 @@ swim1_contains_swim2(struct swim *s1, struct swim *s2) } } swim_iterator_close(it); - return false; + return true; } bool @@ -129,13 +130,15 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster) } int -swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps) +swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout) { - while (! swim_cluster_is_fullmesh(cluster) && max_steps > 0) { + double deadline = swim_time() + timeout; + while (! swim_cluster_is_fullmesh(cluster)) { + if (swim_time() >= deadline) + return -1; swim_do_loop_step(loop()); - --max_steps; } - return max_steps < 0 ? -1 : 0; + return 0; } bool diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h index 56036422d90d8a37758bab4ff34a47afcb7d88c7..90962b658eb212c8770c4f11dfceecfbcce66d6f 100644 --- a/test/unit/swim_test_utils.h +++ b/test/unit/swim_test_utils.h @@ -74,11 +74,9 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id); bool swim_cluster_is_fullmesh(struct swim_cluster *cluster); -/** - * Wait for fullmesh at most @a max_steps event loop iterations. - */ +/** Wait for fullmesh at most @a timeout fake seconds. */ int -swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps); +swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout); #define swim_start_test(n) { \ header(); \