diff --git a/src/box/applier.cc b/src/box/applier.cc index dec0c355c0149a9ae2c9593acec42c223654734e..a44347fe0bdbb147573fa4af4ad331e515667af1 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -2659,7 +2659,7 @@ applier_stop(struct applier *applier) } struct applier * -applier_new(struct uri *uri) +applier_new(const struct uri *uri) { struct applier *applier = (struct applier *) xcalloc(1, sizeof(struct applier)); @@ -2670,7 +2670,7 @@ applier_new(struct uri *uri) iostream_clear(&applier->io); ibuf_create(&applier->ibuf, &cord()->slabc, 1024); - uri_move(&applier->uri, uri); + uri_copy(&applier->uri, uri); applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); rlist_create(&applier->on_ballot_update); diff --git a/src/box/applier.h b/src/box/applier.h index 9930138972c22ebc26c13b2d3148cbdb48893b63..15c41ffe71a7731500b372700d2698dae58a639e 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -308,7 +308,7 @@ applier_stop(struct applier *applier); * @error throws exception */ struct applier * -applier_new(struct uri *uri); +applier_new(const struct uri *uri); /** * Destroy and delete a applier. diff --git a/src/box/box.cc b/src/box/box.cc index 7bb297100772806a1f80d8f80ec86bc2f7aaa405..4c125704cd6ddff52f6b140836aa45e4a26a3ffe 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1728,21 +1728,7 @@ box_sync_replication(bool do_quorum, bool do_reuse) auto uri_set_guard = make_scoped_guard([&]{ uri_set_destroy(&uri_set); }); - if (uri_set.uri_count >= VCLOCK_MAX) { - tnt_raise(ClientError, ER_CFG, "replication", - "too many replicas"); - } - int count = 0; - struct applier *appliers[VCLOCK_MAX] = {}; - auto appliers_guard = make_scoped_guard([&]{ - for (int i = 0; i < count; i++) - applier_delete(appliers[i]); /* doesn't affect diag */ - }); - for (; count < uri_set.uri_count; count++) { - appliers[count] = applier_new(&uri_set.uris[count]); - } - replicaset_connect(appliers, count, do_quorum, do_reuse); - appliers_guard.is_active = false; + replicaset_connect(&uri_set, do_quorum, do_reuse); } static inline void diff --git a/src/box/replication.cc b/src/box/replication.cc index 074fc42f45a3ab5269e2ca20a6f8d6f4b00fe163..27aa447f4da87ba1e6d0d7ec28b56e0a763b2248 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -450,7 +450,7 @@ replicaset_on_health_change(void) trigger_run(&replicaset_on_quorum_loss, NULL); } -void +static void replica_set_applier(struct replica *replica, struct applier *applier) { assert(replica->applier == NULL); @@ -482,7 +482,7 @@ replica_update_applier_health(struct replica *replica) replicaset_on_health_change(); } -void +static void replica_clear_applier(struct replica *replica) { assert(replica->applier != NULL); @@ -973,14 +973,29 @@ replicaset_is_connected(struct replicaset_connect_state *state, } void -replicaset_connect(struct applier **appliers, int count, +replicaset_connect(const struct uri_set *uris, bool connect_quorum, bool keep_connect) { - if (count == 0) { + if (uris->uri_count == 0) { /* Cleanup the replica set. */ - replicaset_update(appliers, 0, false); + replicaset_update(NULL, 0, false); return; } + if (uris->uri_count >= VCLOCK_MAX) { + tnt_raise(ClientError, ER_CFG, "replication", + "too many replicas"); + } + int count = 0; + struct applier *appliers[VCLOCK_MAX] = {}; + auto appliers_guard = make_scoped_guard([&]{ + for (int i = 0; i < count; i++) { + struct applier *applier = appliers[i]; + applier_stop(applier); + applier_delete(applier); + } + }); + for (; count < uris->uri_count; count++) + appliers[count] = applier_new(&uris->uris[count]); say_info("connecting to %d replicas", count); @@ -1045,9 +1060,8 @@ replicaset_connect(struct applier **appliers, int count, /* Timeout or connection failure. */ if (state.connected < replicaset_connect_quorum(count) && connect_quorum) { - diag_set(ClientError, ER_CFG, "replication", - "failed to connect to one or more replicas"); - goto error; + tnt_raise(ClientError, ER_CFG, "replication", + "failed to connect to one or more replicas"); } } else { say_info("connected to %d replicas", state.connected); @@ -1067,19 +1081,8 @@ replicaset_connect(struct applier **appliers, int count, } /* Now all the appliers are connected, update the replica set. */ - try { - replicaset_update(appliers, count, keep_connect); - } catch (Exception *e) { - goto error; - } - return; -error: - /* Destroy appliers */ - for (int i = 0; i < count; i++) { - trigger_clear(&triggers[i].base); - applier_stop(appliers[i]); - } - diag_raise(); + replicaset_update(appliers, count, keep_connect); + appliers_guard.is_active = false; } bool diff --git a/src/box/replication.h b/src/box/replication.h index 059fa9685ab0223b1a67269e7ac470ef1a475816..00bb2da13c468d2a177d084c49a1360aaaf534a4 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -457,12 +457,6 @@ replica_set_id(struct replica *replica, uint32_t id); void replica_clear_id(struct replica *replica); -void -replica_clear_applier(struct replica *replica); - -void -replica_set_applier(struct replica * replica, struct applier * applier); - /** * Check if there are enough "healthy" connections, and fire the appropriate * triggers. A replica connection is considered "healthy", when: @@ -574,9 +568,8 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid); * \post appliers are connected to remote hosts and paused. * Use replicaset_follow() to resume appliers. * - * \param appliers the array of appliers - * \param count size of appliers array - * \param timeout connection timeout + * \param uris remote peer URIs + * \param timeout connection timeout * \param connect_quorum if this flag is set, fail unless at * least replication_connect_quorum * appliers have successfully connected. @@ -584,7 +577,7 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid); * old connection to the replica is fine. */ void -replicaset_connect(struct applier **appliers, int count, +replicaset_connect(const struct uri_set *uris, bool connect_quorum, bool keep_connect); /**