diff --git a/src/box/relay.cc b/src/box/relay.cc index 7cb7a1f704b87ddf99e068c7df6300171140e344..e5710f6be3334996756b7f25390d9d0a4500f569 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -735,6 +735,28 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) } } +/** Process the last received ACK from applier. */ +static void +relay_process_ack(struct relay *relay, double tm) +{ + if (tm == 0) + return; + /* + * Replica sends us last replicated transaction timestamp which is + * needed for relay lag monitoring. Note that this transaction has been + * written to WAL with our current realtime clock value, thus when it + * get reported back we can compute time spent regardless of the clock + * value on remote replica. Update the lag only when the timestamp + * corresponds to some transaction the replica has just applied, i.e. + * received vclock is bigger than the previous one. + */ + const struct vclock *prev_vclock = &relay->status_msg.vclock; + const struct vclock *next_vclock = &relay->last_recv_ack.vclock; + if (vclock_get(prev_vclock, instance_id) < + vclock_get(next_vclock, instance_id)) + relay->txn_lag = ev_now(loop()) - tm; +} + /* * Relay reader fiber function. * Read xrow encoded vclocks sent by the replica. @@ -747,7 +769,6 @@ relay_reader_f(va_list ap) struct ibuf ibuf; ibuf_create(&ibuf, &cord()->slabc, 1024); - struct relay_status_msg *status_msg = &relay->status_msg; struct applier_heartbeat *last_recv_ack = &relay->last_recv_ack; try { while (!fiber_is_cancelled()) { @@ -756,22 +777,7 @@ relay_reader_f(va_list ap) coio_read_xrow_timeout_xc(relay->io, &ibuf, &xrow, replication_disconnect_timeout()); xrow_decode_applier_heartbeat_xc(&xrow, last_recv_ack); - /* - * Replica send us last replicated transaction - * timestamp which is needed for relay lag - * monitoring. Note that this transaction has - * been written to WAL with our current realtime - * clock value, thus when it get reported back we - * can compute time spent regardless of the clock - * value on remote replica. Update the lag only when the - * timestamp corresponds to some transaction the replica - * has just applied, i.e. received vclock is bigger than - * the previous one. - */ - if (xrow.tm != 0 && - vclock_get(&status_msg->vclock, instance_id) < - vclock_get(&last_recv_ack->vclock, instance_id)) - relay->txn_lag = ev_now(loop()) - xrow.tm; + relay_process_ack(relay, xrow.tm); fiber_cond_signal(&relay->reader_cond); } } catch (Exception *e) {