diff --git a/src/box/box.cc b/src/box/box.cc index 5b3aba2b0ad097ad4babf49614380875d5197562..d630f20540d293215b67cdae8d78a3d0a28a9470 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1061,8 +1061,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Remember master's vclock after the last request */ struct vclock stop_vclock; - /* TODO: use WAL vclock instead of TX vclock */ - vclock_copy(&stop_vclock, &recovery->vclock); + wal_checkpoint(wal, &stop_vclock, false); /* Send end of initial stage data marker */ xrow_encode_vclock(&row, &stop_vclock); @@ -1077,7 +1076,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header) say_info("final data sent."); /* Send end of WAL stream marker */ - xrow_encode_vclock(&row, &recovery->vclock); + struct vclock current_vclock; + wal_checkpoint(wal, ¤t_vclock, false); + xrow_encode_vclock(&row, ¤t_vclock); row.sync = header->sync; coio_write_xrow(io, &row); } @@ -1133,7 +1134,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) * and identify ourselves with our own server id. */ struct xrow_header row; - xrow_encode_vclock(&row, &recovery->vclock); + struct vclock current_vclock; + wal_checkpoint(wal, ¤t_vclock, true); + xrow_encode_vclock(&row, ¤t_vclock); /* * Identify the message with the server id of this * server, this is the only way for a replica to find @@ -1458,9 +1461,8 @@ box_snapshot() struct vclock vclock; if (wal == NULL) { vclock_copy(&vclock, &recovery->vclock); - } else if (wal_checkpoint(wal, &vclock) == -1) { - rc = EIO; - goto end; + } else { + wal_checkpoint(wal, &vclock, true); } rc = engine_commit_checkpoint(&vclock); end: diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 3e14d99215e2d945bd4e20b6bc03f91c3d6a64e9..6d4189e80ec48e64919b7db7d79e2a5bec665f56 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -39,6 +39,7 @@ #include "box/applier.h" #include "box/recovery.h" +#include "box/wal.h" #include "box/cluster.h" #include "main.h" #include "box/box.h" @@ -138,9 +139,10 @@ lbox_info_server(struct lua_State *L) lua_pushlstring(L, tt_uuid_str(&SERVER_UUID), UUID_STR_LEN); lua_settable(L, -3); lua_pushliteral(L, "lsn"); - if (recovery->server_id != SERVER_ID_NIL) { - luaL_pushint64(L, vclock_get(&recovery->vclock, - recovery->server_id)); + if (recovery->server_id != SERVER_ID_NIL && wal != NULL) { + struct vclock vclock; + wal_checkpoint(wal, &vclock, false); + luaL_pushint64(L, vclock_get(&vclock, recovery->server_id)); } else { luaL_pushint64(L, -1); } @@ -155,7 +157,13 @@ lbox_info_server(struct lua_State *L) static int lbox_info_vclock(struct lua_State *L) { - lbox_pushvclock(L, &recovery->vclock); + struct vclock vclock; + if (wal != NULL) { + wal_checkpoint(wal, &vclock, false); + } else { + vclock_create(&vclock); + } + lbox_pushvclock(L, &vclock); return 1; } diff --git a/src/box/wal.cc b/src/box/wal.cc index 89a2a12ebfab28e5354e9270c15ba8d80835726f..f9e4dff22569fcddf3284d81ee0293b62a97e8a2 100644 --- a/src/box/wal.cc +++ b/src/box/wal.cc @@ -342,6 +342,7 @@ struct wal_checkpoint: public cmsg { struct vclock *vclock; struct fiber *fiber; + bool rotate; }; void @@ -352,7 +353,7 @@ wal_checkpoint_f(struct cmsg *data) /* * Avoid closing the current WAL if it has no rows (empty). */ - if (writer->current_wal != NULL && + if (msg->rotate && writer->current_wal != NULL && vclock_sum(&writer->current_wal->vclock) != vclock_sum(&writer->vclock)) { @@ -373,8 +374,8 @@ wal_checkpoint_done_f(struct cmsg *data) fiber_wakeup(msg->fiber); } -int64_t -wal_checkpoint(struct wal_writer *writer, struct vclock *vclock) +void +wal_checkpoint(struct wal_writer *writer, struct vclock *vclock, bool rotate) { static struct cmsg_hop wal_checkpoint_route[] = { {wal_checkpoint_f, &wal_writer_singleton.tx_pipe}, @@ -385,9 +386,11 @@ wal_checkpoint(struct wal_writer *writer, struct vclock *vclock) cmsg_init(&msg, wal_checkpoint_route); msg.vclock = vclock; msg.fiber = fiber(); + msg.rotate = rotate; cpipe_push(&writer->wal_pipe, &msg); + fiber_set_cancellable(false); fiber_yield(); - return vclock_size(vclock) ? vclock_sum(vclock) : -1; + fiber_set_cancellable(true); } /** diff --git a/src/box/wal.h b/src/box/wal.h index 19452e5559906548bea98320a21e307a8234370f..dafe579c1a214f448230e6e5ab9c9382fe5825bd 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -67,17 +67,6 @@ struct wal_request { int64_t wal_write(struct wal_writer *writer, struct wal_request *req); -/** - * Wait till all pending changes to the WAL are flushed. - * Rotates the WAL. - * - * @param[out] vclock WAL vclock - * - * @retval -1 error - * @retval lsn success - */ -int64_t -wal_checkpoint(struct wal_writer *writer, struct vclock *vclock); void wal_writer_start(enum wal_mode wal_mode, const char *wal_dirname, @@ -109,6 +98,22 @@ wal_clear_watcher(struct wal_writer *, struct wal_watcher *); void wal_atfork(); +extern "C" { +#endif /* defined(__cplusplus) */ + +/** + * Wait till all pending changes to the WAL are flushed. + * Rotates the WAL. + * + * @param[out] vclock WAL vclock + * + */ +void +wal_checkpoint(struct wal_writer *writer, struct vclock *vclock, + bool rotate); + +#if defined(__cplusplus) +} /* extern "C" */ #endif /* defined(__cplusplus) */ #endif /* TARANTOOL_WAL_WRITER_H_INCLUDED */ diff --git a/test/xlog/panic_on_lsn_gap.result b/test/xlog/panic_on_lsn_gap.result index d1615de462a3ef3055423c640ac1617c69b0680a..313850a664a87f0128ed15aae9bc946f051dd5ce 100644 --- a/test/xlog/panic_on_lsn_gap.result +++ b/test/xlog/panic_on_lsn_gap.result @@ -91,7 +91,7 @@ name = string.match(arg[0], "([^,]+)%.lua") ... box.info.vclock --- -- {1: 11} +- {1: 1} ... require('fio').glob(name .. "/*.xlog") --- @@ -153,7 +153,7 @@ t ... box.info.vclock --- -- {1: 11} +- {1: 1} ... box.error.injection.set("ERRINJ_WAL_WRITE", false) --- @@ -229,7 +229,7 @@ box.space._schema:replace{"key", 'test 3'} ... box.info.vclock --- -- {1: 23} +- {1: 22} ... require('fio').glob(name .. "/*.xlog") --- @@ -245,7 +245,7 @@ box.space._schema:replace{"key", 'test 3'} ... box.info.vclock --- -- {1: 24} +- {1: 22} ... require('fio').glob(name .. "/*.xlog") ---