Skip to content
Snippets Groups Projects
Commit b51b7a0f authored by Vladimir Davydov's avatar Vladimir Davydov Committed by Konstantin Osipov
Browse files

relay: cleanup error handling

 - Handle cord_costart() errors properly.
 - Don't use scoped_guard as we don't actually need it there.
 - Drop relay_destroy() and rename relay_create() to relay_init().
parent 2b5aa3fc
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,6 @@
#include "errinj.h"
#include "fiber.h"
#include "say.h"
#include "scoped_guard.h"
#include "coio.h"
#include "coio_task.h"
......@@ -132,8 +131,8 @@ static void
relay_send_row(struct xstream *stream, struct xrow_header *row);
static inline void
relay_create(struct relay *relay, int fd, uint64_t sync,
void (*stream_write)(struct xstream *, struct xrow_header *))
relay_init(struct relay *relay, int fd, uint64_t sync,
void (*stream_write)(struct xstream *, struct xrow_header *))
{
memset(relay, 0, sizeof(*relay));
xstream_create(&relay->stream, stream_write);
......@@ -141,12 +140,6 @@ relay_create(struct relay *relay, int fd, uint64_t sync,
relay->sync = sync;
}
static inline void
relay_destroy(struct relay *relay)
{
(void) relay;
}
static inline void
relay_set_cord_name(int fd)
{
......@@ -166,11 +159,7 @@ void
relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
{
struct relay relay;
relay_create(&relay, fd, sync, relay_send_initial_join_row);
auto scope_guard = make_scoped_guard([&]{
relay_destroy(&relay);
});
relay_init(&relay, fd, sync, relay_send_initial_join_row);
assert(relay.stream.write != NULL);
engine_join(vclock, &relay.stream);
}
......@@ -194,19 +183,22 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
struct vclock *stop_vclock)
{
struct relay relay;
relay_create(&relay, fd, sync, relay_send_row);
relay_init(&relay, fd, sync, relay_send_row);
relay.r = recovery_new(cfg_gets("wal_dir"),
cfg_geti("force_recovery"),
start_vclock);
vclock_copy(&relay.stop_vclock, stop_vclock);
auto scope_guard = make_scoped_guard([&]{
recovery_delete(relay.r);
relay_destroy(&relay);
});
cord_costart(&relay.cord, "final_join", relay_final_join_f, &relay);
if (cord_cojoin(&relay.cord) != 0)
int rc = cord_costart(&relay.cord, "final_join",
relay_final_join_f, &relay);
if (rc == 0)
rc = cord_cojoin(&relay.cord);
recovery_delete(relay.r);
if (rc != 0)
diag_raise();
ERROR_INJECT(ERRINJ_RELAY_FINAL_SLEEP, {
while (vclock_compare(stop_vclock, &replicaset_vclock) == 0)
fiber_sleep(0.001);
......@@ -295,8 +287,8 @@ relay_subscribe_f(va_list ap)
{
struct relay *relay = va_arg(ap, struct relay *);
struct recovery *r = relay->r;
coio_enable();
relay->stream.write = relay_send_row;
cbus_endpoint_create(&relay->endpoint, cord_name(cord()),
fiber_schedule_cb, fiber());
cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe,
......@@ -308,19 +300,7 @@ relay_subscribe_f(va_list ap)
trigger_add(&r->on_close_log, &on_close_log);
wal_set_watcher(&relay->wal_watcher, cord_name(cord()),
relay_process_wal_event, cbus_process);
/*
* Create a guard to detach the relay from cbus and
* clear the garbage collection trigger on exit.
*/
auto guard = make_scoped_guard([&]{
trigger_clear(&on_close_log);
say_crit("exiting the relay loop");
relay->exiting = true;
wal_clear_watcher(&relay->wal_watcher, cbus_process);
cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
NULL, NULL, cbus_process);
cbus_endpoint_destroy(&relay->endpoint, cbus_process);
});
relay_set_cord_name(relay->io.fd);
/*
......@@ -351,7 +331,7 @@ relay_subscribe_f(va_list ap)
cbus_process(&relay->endpoint);
if (relay->failed)
diag_raise();
break;
uint8_t data;
int rc = recv(read_ev.fd, &data, sizeof(data), 0);
......@@ -380,7 +360,16 @@ relay_subscribe_f(va_list ap)
relay->status_msg.relay = relay;
cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
}
return 0;
say_crit("exiting the relay loop");
relay->exiting = true;
trigger_clear(&on_close_log);
wal_clear_watcher(&relay->wal_watcher, cbus_process);
cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
NULL, NULL, cbus_process);
cbus_endpoint_destroy(&relay->endpoint, cbus_process);
return relay->failed ? -1 : 0;
}
/** Replication acceptor fiber handler. */
......@@ -409,7 +398,7 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica,
}
struct relay relay;
relay_create(&relay, fd, sync, relay_send_row);
relay_init(&relay, fd, sync, relay_send_row);
relay.r = recovery_new(cfg_gets("wal_dir"),
cfg_geti("force_recovery"),
replica_clock);
......@@ -417,19 +406,16 @@ relay_subscribe(int fd, uint64_t sync, struct replica *replica,
relay.replica = replica;
replica_set_relay(replica, &relay);
auto scope_guard = make_scoped_guard([&]{
replica_clear_relay(replica);
recovery_delete(relay.r);
relay_destroy(&relay);
});
int rc = cord_costart(&relay.cord, tt_sprintf("relay_%p", &relay),
relay_subscribe_f, &relay);
if (rc == 0)
rc = cord_cojoin(&relay.cord);
struct cord cord;
char name[FIBER_NAME_MAX];
snprintf(name, sizeof(name), "relay_%p", &relay);
cord_costart(&cord, name, relay_subscribe_f, &relay);
if (cord_cojoin(&cord) != 0) {
replica_clear_relay(replica);
recovery_delete(relay.r);
if (rc != 0)
diag_raise();
}
}
static void
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment