diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 64d509893bc6d26f612e7e68d2a69c38d97773b3..cf3a707a3ea32dc32f585970804cd2c8f555e1fe 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -277,10 +277,12 @@ recover_xlog(struct recovery *r, struct xstream *stream, say_info("%.1fM rows processed", row_count / 1000000.); } else { - say_error("can't apply row: "); - diag_log(); if (!r->wal_dir.force_recovery) diag_raise(); + + say_error("skipping row {%u: %lld}", + (unsigned)row.replica_id, (long long)row.lsn); + diag_log(); } } } diff --git a/src/box/relay.cc b/src/box/relay.cc index 5cd754d9b9933959551489b86ca3f72428922760..48b1648f92db8cfb0dd6ba1737bcf7b07f1e62b1 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -404,6 +404,14 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock) cpipe_push(&relay->tx_pipe, &gc_msg->msg); } +static void +relay_set_error(struct relay *relay, struct error *e) +{ + /* Don't override existing error. */ + if (diag_is_empty(&relay->diag)) + diag_add_error(&relay->diag, e); +} + static void relay_process_wal_event(struct wal_watcher *watcher, unsigned events) { @@ -424,8 +432,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) recover_remaining_wals(relay->r, &relay->stream, NULL, (events & WAL_EVENT_ROTATE) != 0); } catch (Exception *e) { - e->log(); - diag_move(diag_get(), &relay->diag); + relay_set_error(relay, e); fiber_cancel(fiber()); } } @@ -455,17 +462,8 @@ relay_reader_f(va_list ap) fiber_cond_signal(&relay->reader_cond); } } catch (Exception *e) { - if (diag_is_empty(&relay->diag)) { - /* Don't override existing error. */ - diag_move(diag_get(), &relay->diag); - fiber_cancel(relay_f); - } else if (!fiber_is_cancelled()) { - /* - * There is an relay error and this fiber - * fiber has another, log it. - */ - e->log(); - } + relay_set_error(relay, e); + fiber_cancel(relay_f); } ibuf_destroy(&ibuf); return 0; @@ -482,7 +480,8 @@ relay_send_heartbeat(struct relay *relay) try { relay_send(relay, &row); } catch (Exception *e) { - e->log(); + relay_set_error(relay, e); + fiber_cancel(fiber()); } } @@ -498,20 +497,25 @@ relay_subscribe_f(va_list ap) struct recovery *r = relay->r; coio_enable(); - cbus_endpoint_create(&relay->endpoint, cord_name(cord()), + relay_set_cord_name(relay->io.fd); + + /* Create cpipe to tx for propagating vclock. */ + cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay), fiber_schedule_cb, fiber()); - cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe, - NULL, NULL, cbus_process); + cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, + &relay->relay_pipe, NULL, NULL, cbus_process); + /* Setup garbage collection trigger. */ struct trigger on_close_log = { RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL }; trigger_add(&r->on_close_log, &on_close_log); - wal_set_watcher(&relay->wal_watcher, cord_name(cord()), - relay_process_wal_event, cbus_process); - relay_set_cord_name(relay->io.fd); + /* Setup WAL watcher for sending new rows to the replica. */ + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, + relay_process_wal_event, cbus_process); + /* Start fiber for receiving replica acks. */ char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); struct fiber *reader = fiber_new_xc(name, relay_reader_f); @@ -526,6 +530,10 @@ relay_subscribe_f(va_list ap) */ relay_send_heartbeat(relay); + /* + * Run the event loop until the connection is broken + * or an error occurs. + */ while (!fiber_is_cancelled()) { double timeout = replication_timeout; struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, @@ -570,26 +578,33 @@ relay_subscribe_f(va_list ap) relay_schedule_pending_gc(relay, send_vclock); } + /* + * Log the error that caused the relay to break the loop. + * Don't clear the error for status reporting. + */ + assert(!diag_is_empty(&relay->diag)); + diag_add_error(diag_get(), diag_last_error(&relay->diag)); + diag_log(); say_crit("exiting the relay loop"); + + /* Clear garbage collector trigger and WAL watcher. */ trigger_clear(&on_close_log); wal_clear_watcher(&relay->wal_watcher, cbus_process); - if (!fiber_is_dead(reader)) - fiber_cancel(reader); + + /* Join ack reader fiber. */ + fiber_cancel(reader); fiber_join(reader); + + /* Destroy cpipe to tx. */ cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); - if (!diag_is_empty(&relay->diag)) { - /* An error has occurred while reading ACKs of xlog. */ - diag_move(&relay->diag, diag_get()); - /* Reference the diag in the status. */ - diag_add_error(&relay->diag, diag_last_error(diag_get())); - } + struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); if (inj != NULL && inj->dparam > 0) fiber_sleep(inj->dparam); - return diag_is_empty(diag_get()) ? 0: -1; + return -1; } /** Replication acceptor fiber handler. */ @@ -619,7 +634,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; - int rc = cord_costart(&relay->cord, tt_sprintf("relay_%p", relay), + int rc = cord_costart(&relay->cord, "subscribe", relay_subscribe_f, relay); if (rc == 0) rc = cord_cojoin(&relay->cord);