diff --git a/src/box/applier.cc b/src/box/applier.cc index b51fed10367dcc72cf3afb6caeaa64ed96b06d06..0e621f6f890140cf627f01e87494f63d97378f24 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -1505,12 +1505,7 @@ apply_final_join_tx(uint32_t replica_id, struct stailq *rows) { struct applier_tx_row *txr = stailq_first_entry(rows, struct applier_tx_row, next); - - struct xrow_header *last_row = - &stailq_last_entry(rows, struct applier_tx_row, next)->row; int rc = 0; - /* WAL isn't enabled yet, so follow vclock manually. */ - vclock_follow_xrow(instance_vclock, last_row); if (unlikely(iproto_type_is_synchro_request(txr->row.type))) { rc = apply_synchro_req(replica_id, &txr->row, &txr->req.synchro); diff --git a/src/box/box.cc b/src/box/box.cc index 2da0bd22225b6a8a6febe50b131cc058aa3b1c0d..3e09ddcf241eb37422eca8a3b32bda7557e4f9d4 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -610,6 +610,8 @@ recovery_journal_write(struct journal *base, struct journal_entry *entry) { struct recovery_journal *journal = (struct recovery_journal *) base; + for (int i = 0; i < entry->n_rows; ++i) + vclock_follow_xrow(journal->vclock, entry->rows[i]); entry->res = vclock_sum(journal->vclock); /* * Since there're no actual writes, fire a @@ -695,6 +697,8 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) say_error("couldn't decode a synchro request"); return -1; } + if (journal_write_row(row) != 0) + return -1; return txn_limbo_process(&txn_limbo, &syn_req); } @@ -712,6 +716,8 @@ wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row) say_error("couldn't decode a raft request"); return -1; } + if (journal_write_row(row) != 0) + return -1; box_raft_recover(&raft_req); return 0; } @@ -788,6 +794,12 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) say_error("couldn't apply the request"); goto end_diag_request; } + } else { + if (txn_begin_stmt(txn, NULL, request.type) != 0) + goto end_diag_request; + if (txn_commit_stmt(txn, &request)) + goto end_diag_request; + } assert(txn != NULL); if (!row->is_commit) @@ -4823,10 +4835,14 @@ local_recovery(const struct tt_uuid *instance_uuid, * other engines. */ memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock); - box_run_on_recovery_state(RECOVERY_STATE_SNAPSHOT_RECOVERED); - - recovery_journal_create(&recovery->vclock); + /* + * Xlog starts after snapshot. Hence recovery vclock must point at the + * end of snapshot (= checkpoint vclock). + */ + struct vclock recovery_vclock; + vclock_copy(&recovery_vclock, checkpoint_vclock); + recovery_journal_create(&recovery_vclock); engine_begin_final_recovery_xc(); recover_remaining_wals(recovery, &wal_stream.base, NULL, false); if (wal_stream_has_unfinished_tx(&wal_stream)) {