diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 40fa3fb21b8b619b3606338c94f4c3b2296d7a99..902b7111fdf725c8641246c8d8a788c63f2dc812 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -366,6 +366,11 @@ struct swim { * be triggered by any update of any member. */ struct swim_task round_step_task; + /** + * True if a packet in the round step task is still valid + * and can be resent on a next round step. + */ + bool is_round_packet_valid; /** * Preallocated buffer to store shuffled members here at * the beginning of each round. @@ -416,11 +421,18 @@ struct swim { struct rlist dissemination_queue; }; -/** Reset cached round message on any change of any member. */ +/** + * Mark cached round message invalid on any change of any member. + * It triggers postponed rebuilding of the message. The round + * packet can not be rebuilt right now because 1) invalidation can + * occur several times in row when multiple member attributes are + * updated, or more than one member are added, 2) the message can + * be in fly right now in the output queue inside the scheduler. + */ static inline void swim_cached_round_msg_invalidate(struct swim *swim) { - swim_packet_create(&swim->round_step_task.packet); + swim->is_round_packet_valid = false; } /** Put the member into a list of ACK waiters. */ @@ -847,7 +859,7 @@ swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) static void swim_encode_round_msg(struct swim *swim) { - if (swim_packet_body_size(&swim->round_step_task.packet) > 0) + if (swim->is_round_packet_valid) return; struct swim_packet *packet = &swim->round_step_task.packet; swim_packet_create(packet); @@ -861,6 +873,7 @@ swim_encode_round_msg(struct swim *swim) assert(mp_sizeof_map(map_size) == 1 && map_size >= 2); mp_encode_map(header, map_size); + swim->is_round_packet_valid = true; } /** @@ -964,6 +977,7 @@ swim_send_fd_msg(struct swim *swim, struct swim_task *task, /* * Reset packet allocator in case if task is being reused. */ + assert(! swim_task_is_scheduled(task)); swim_packet_create(&task->packet); char *header = swim_packet_alloc(&task->packet, 1); int map_size = swim_encode_src_uuid(swim, &task->packet);