From 74d8db74f97ff8346d8acb2953ed280db2887a20 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov.dev@gmail.com> Date: Fri, 23 Nov 2018 20:35:23 +0300 Subject: [PATCH] wal: separate checkpoint and flush paths Currently, wal_checkpoint() is used for two purposes. First, to make a checkpoint (rotate = true). Second, to flush all pending WAL requests (rotate = false). Since checkpointing has to fail if cascading rollback is in progress so does flushing. This is confusing. Let's separate the two paths. While we are at it, let's also rewrite WAL checkpointing using cbus_call instead of cpipe_push as it's a more convenient way of exchanging simple two-hop messages between two threads. --- src/box/box.cc | 5 ++- src/box/vinyl.c | 5 +-- src/box/wal.c | 91 +++++++++++++++++++++++-------------------------- src/box/wal.h | 15 +++++--- 4 files changed, 57 insertions(+), 59 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 8d6e966e02..5ea2f01445 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2190,10 +2190,9 @@ box_checkpoint() goto end; struct vclock vclock; - if ((rc = wal_checkpoint(&vclock, true))) { - tnt_error(ClientError, ER_CHECKPOINT_ROLLBACK); + if ((rc = wal_checkpoint(&vclock))) goto end; - } + rc = engine_commit_checkpoint(&vclock); end: if (rc) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 1794489d59..f5b36ce14d 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1014,10 +1014,7 @@ vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm) * Wait for prepared transactions to complete * (we can't abort them as they reached WAL). */ - struct vclock unused; - if (wal_checkpoint(&unused, false) != 0) - return -1; - + wal_sync(); return 0; } diff --git a/src/box/wal.c b/src/box/wal.c index 11aae5fcb6..524d47dc9b 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -461,29 +461,39 @@ wal_thread_stop() wal_writer_destroy(&wal_writer_singleton); } +void +wal_sync(void) +{ + struct wal_writer *writer = &wal_writer_singleton; + if (writer->wal_mode == WAL_NONE) + return; + cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL); +} + struct wal_checkpoint { - struct cmsg base; - struct vclock *vclock; - struct fiber *fiber; - bool rotate; - int res; + struct cbus_call_msg base; + struct vclock vclock; }; -void -wal_checkpoint_f(struct cmsg *data) +static int +wal_checkpoint_f(struct cbus_call_msg *data) { struct wal_checkpoint *msg = (struct wal_checkpoint *) data; struct wal_writer *writer = &wal_writer_singleton; if (writer->in_rollback.route != NULL) { - /* We're rolling back a failed write. */ - msg->res = -1; - return; + /* + * We're rolling back a failed write and so + * can't make a checkpoint - see the comment + * in wal_checkpoint() for the explanation. + */ + diag_set(ClientError, ER_CHECKPOINT_ROLLBACK); + return -1; } /* * Avoid closing the current WAL if it has no rows (empty). */ - if (msg->rotate && xlog_is_open(&writer->current_wal) && + if (xlog_is_open(&writer->current_wal) && vclock_sum(&writer->current_wal.meta.vclock) != vclock_sum(&writer->vclock)) { @@ -492,53 +502,38 @@ wal_checkpoint_f(struct cmsg *data) * The next WAL will be created on the first write. */ } - vclock_copy(msg->vclock, &writer->vclock); -} - -void -wal_checkpoint_done_f(struct cmsg *data) -{ - struct wal_checkpoint *msg = (struct wal_checkpoint *) data; - fiber_wakeup(msg->fiber); + vclock_copy(&msg->vclock, &writer->vclock); + return 0; } int -wal_checkpoint(struct vclock *vclock, bool rotate) +wal_checkpoint(struct vclock *vclock) { struct wal_writer *writer = &wal_writer_singleton; - if (! stailq_empty(&writer->rollback)) { - /* - * The writer rollback queue is not empty, - * roll back this transaction immediately. - * This is to ensure we do not accidentally - * commit a transaction which has seen changes - * that will be rolled back. - */ - say_error("Aborting transaction %llu during " - "cascading rollback", - vclock_sum(&writer->vclock)); - return -1; - } if (writer->wal_mode == WAL_NONE) { vclock_copy(vclock, &writer->vclock); return 0; } - static struct cmsg_hop wal_checkpoint_route[] = { - {wal_checkpoint_f, &wal_thread.tx_prio_pipe}, - {wal_checkpoint_done_f, NULL}, - }; - vclock_create(vclock); + if (!stailq_empty(&writer->rollback)) { + /* + * If cascading rollback is in progress, in-memory + * indexes can contain changes scheduled for rollback. + * If we made a checkpoint, we could write them to + * the snapshot. So we abort checkpointing in this + * case. + */ + diag_set(ClientError, ER_CHECKPOINT_ROLLBACK); + return -1; + } struct wal_checkpoint msg; - cmsg_init(&msg.base, wal_checkpoint_route); - msg.vclock = vclock; - msg.fiber = fiber(); - msg.rotate = rotate; - msg.res = 0; - cpipe_push(&wal_thread.wal_pipe, &msg.base); - fiber_set_cancellable(false); - fiber_yield(); - fiber_set_cancellable(true); - return msg.res; + bool cancellable = fiber_set_cancellable(false); + int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + &msg.base, wal_checkpoint_f, NULL, TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); + if (rc != 0) + return -1; + vclock_copy(vclock, &msg.vclock); + return 0; } struct wal_gc_msg diff --git a/src/box/wal.h b/src/box/wal.h index e4094b1e49..380b098c65 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -166,13 +166,20 @@ wal_mode(); /** * Wait till all pending changes to the WAL are flushed. - * Rotates the WAL. - * - * @param[out] vclock WAL vclock + */ +void +wal_sync(void); + +/** + * Prepare WAL for checkpointing. * + * This function flushes all pending changes and rotates the + * current WAL. The vclock of the last record written to the + * rotated WAL is returned in @vclock. This is the vclock that + * is supposed to be used to identify the new checkpoint. */ int -wal_checkpoint(struct vclock *vclock, bool rotate); +wal_checkpoint(struct vclock *vclock); /** * Remove WAL files that are not needed by consumers reading -- GitLab