diff --git a/src/box/box.cc b/src/box/box.cc index e12a1cba423b6fbfd85be48159edc772054f9402..fde3ecba96f0e96de4191beae34a400ed281cf51 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1691,7 +1691,7 @@ box_free(void) sequence_free(); gc_free(); engine_shutdown(); - wal_thread_stop(); + wal_free(); } } @@ -1761,12 +1761,13 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid) /* Set UUID of a new replica set */ box_set_replicaset_uuid(replicaset_uuid); + /* Enable WAL subsystem. */ + if (wal_enable() != 0) + diag_raise(); + /* Make the initial checkpoint */ - if (engine_begin_checkpoint() || - engine_commit_checkpoint(&replicaset.vclock)) + if (gc_checkpoint() != 0) panic("failed to create a checkpoint"); - - gc_add_checkpoint(&replicaset.vclock); } /** @@ -1813,9 +1814,6 @@ bootstrap_from_master(struct replica *master) applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY); - /* Clear the pointer to journal before it goes out of scope */ - journal_set(NULL); - /* Finalize the new replica */ engine_end_recovery_xc(); @@ -1823,12 +1821,22 @@ bootstrap_from_master(struct replica *master) applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY); assert(applier->state == APPLIER_READY); + /* + * An engine may write to WAL on its own during the join + * stage (e.g. Vinyl's deferred DELETEs). That's OK - those + * records will pass through the recovery journal and wind + * up in the initial checkpoint. However, we must enable + * the WAL right before starting checkpointing so that + * records written during and after the initial checkpoint + * go to the real WAL and can be recovered after restart. + * This also clears the recovery journal created on stack. + */ + if (wal_enable() != 0) + diag_raise(); + /* Make the initial checkpoint */ - if (engine_begin_checkpoint() || - engine_commit_checkpoint(&replicaset.vclock)) + if (gc_checkpoint() != 0) panic("failed to create a checkpoint"); - - gc_add_checkpoint(&replicaset.vclock); } /** @@ -2000,6 +2008,16 @@ local_recovery(const struct tt_uuid *instance_uuid, box_sync_replication(false); } recovery_finalize(recovery); + + /* + * We must enable WAL before finalizing engine recovery, + * because an engine may start writing to WAL right after + * this point (e.g. deferred DELETE statements in Vinyl). + * This also clears the recovery journal created on stack. + */ + if (wal_enable() != 0) + diag_raise(); + engine_end_recovery_xc(); /* Check replica set UUID. */ @@ -2009,9 +2027,6 @@ local_recovery(const struct tt_uuid *instance_uuid, tt_uuid_str(replicaset_uuid), tt_uuid_str(&REPLICASET_UUID)); } - - /* Clear the pointer to journal before it goes out of scope */ - journal_set(NULL); } static void @@ -2083,7 +2098,15 @@ box_cfg_xc(void) port_init(); iproto_init(); sql_init(); - wal_thread_start(); + + int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal")); + int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); + enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode")); + if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows, + wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection, + on_wal_checkpoint_threshold) != 0) { + diag_raise(); + } title("loading"); @@ -2128,8 +2151,6 @@ box_cfg_xc(void) /* Bootstrap a new master */ bootstrap(&instance_uuid, &replicaset_uuid, &is_bootstrap_leader); - checkpoint = gc_last_checkpoint(); - assert(checkpoint != NULL); } fiber_gc(); @@ -2143,17 +2164,6 @@ box_cfg_xc(void) } } - /* Start WAL writer */ - int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal")); - int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size")); - enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode")); - if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows, - wal_max_size, &INSTANCE_UUID, &replicaset.vclock, - &checkpoint->vclock, on_wal_garbage_collection, - on_wal_checkpoint_threshold) != 0) { - diag_raise(); - } - rmean_cleanup(rmean_box); /* Follow replica */ diff --git a/src/box/wal.c b/src/box/wal.c index cdcaabc0089361bf4156c549933fa15d6f1863fa..0b49548c0ef58ef6256c2679f9cd4c26628745fa 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -66,19 +66,6 @@ wal_write(struct journal *, struct journal_entry *); static int64_t wal_write_in_wal_mode_none(struct journal *, struct journal_entry *); -/* WAL thread. */ -struct wal_thread { - /** 'wal' thread doing the writes. */ - struct cord cord; - /** A pipe from 'tx' thread to 'wal' */ - struct cpipe wal_pipe; - /** - * Return pipe from 'wal' to tx'. This is a - * priority pipe and DOES NOT support yield. - */ - struct cpipe tx_prio_pipe; -}; - /* * WAL writer - maintain a Write Ahead Log for every change * in the data state. @@ -100,6 +87,8 @@ struct wal_writer * the wal-tx bus and are rolled back "on arrival". */ struct stailq rollback; + /** A pipe from 'tx' thread to 'wal' */ + struct cpipe wal_pipe; /* ----------------- wal ------------------- */ /** A setting from instance configuration - rows_per_wal */ int64_t wal_max_rows; @@ -109,6 +98,13 @@ struct wal_writer enum wal_mode wal_mode; /** wal_dir, from the configuration file. */ struct xdir wal_dir; + /** 'wal' thread doing the writes. */ + struct cord cord; + /** + * Return pipe from 'wal' to tx'. This is a + * priority pipe and DOES NOT support yield. + */ + struct cpipe tx_prio_pipe; /** * The vector clock of the WAL writer. It's a bit behind * the vector clock of the transaction thread, since it @@ -184,7 +180,6 @@ struct vy_log_writer { }; static struct vy_log_writer vy_log_writer; -static struct wal_thread wal_thread; static struct wal_writer wal_writer_singleton; enum wal_mode @@ -200,7 +195,7 @@ static void tx_schedule_commit(struct cmsg *msg); static struct cmsg_hop wal_request_route[] = { - {wal_write_to_disk, &wal_thread.tx_prio_pipe}, + {wal_write_to_disk, &wal_writer_singleton.tx_prio_pipe}, {tx_schedule_commit, NULL}, }; @@ -349,8 +344,6 @@ static void wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows, int64_t wal_max_size, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, - const struct vclock *checkpoint_vclock, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { @@ -372,8 +365,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, writer->checkpoint_threshold = INT64_MAX; writer->checkpoint_triggered = false; - vclock_copy(&writer->vclock, vclock); - vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock); + vclock_create(&writer->vclock); + vclock_create(&writer->checkpoint_vclock); rlist_create(&writer->watchers); writer->on_garbage_collection = on_garbage_collection; @@ -387,21 +380,9 @@ wal_writer_destroy(struct wal_writer *writer) xdir_destroy(&writer->wal_dir); } -/** WAL thread routine. */ +/** WAL writer thread routine. */ static int -wal_thread_f(va_list ap); - -/** Start WAL thread and setup pipes to and from TX. */ -void -wal_thread_start() -{ - if (cord_costart(&wal_thread.cord, "wal", wal_thread_f, NULL) != 0) - panic("failed to start WAL thread"); - - /* Create a pipe to WAL thread. */ - cpipe_create(&wal_thread.wal_pipe, "wal"); - cpipe_set_max_input(&wal_thread.wal_pipe, IOV_MAX); -} +wal_writer_f(va_list ap); static int wal_open_f(struct cbus_call_msg *msg) @@ -440,7 +421,7 @@ wal_open(struct wal_writer *writer) * thread. */ struct cbus_call_msg msg; - if (cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg, + if (cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg, wal_open_f, NULL, TIMEOUT_INFINITY) == 0) { /* * Success: we can now append to @@ -472,28 +453,38 @@ wal_open(struct wal_writer *writer) return 0; } -/** - * Initialize WAL writer. - * - * @pre The instance has completed recovery from a snapshot - * and/or existing WALs. All WALs opened in read-only - * mode are closed. WAL thread has been started. - */ int wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows, int64_t wal_max_size, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, const struct vclock *checkpoint_vclock, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold) { assert(wal_max_rows > 1); + /* Initialize the state. */ struct wal_writer *writer = &wal_writer_singleton; wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows, - wal_max_size, instance_uuid, vclock, - checkpoint_vclock, on_garbage_collection, + wal_max_size, instance_uuid, on_garbage_collection, on_checkpoint_threshold); + /* Start WAL thread. */ + if (cord_costart(&writer->cord, "wal", wal_writer_f, NULL) != 0) + return -1; + + /* Create a pipe to WAL thread. */ + cpipe_create(&writer->wal_pipe, "wal"); + cpipe_set_max_input(&writer->wal_pipe, IOV_MAX); + return 0; +} + +int +wal_enable(void) +{ + struct wal_writer *writer = &wal_writer_singleton; + + /* Initialize the writer vclock from the recovery state. */ + vclock_copy(&writer->vclock, &replicaset.vclock); + /* * Scan the WAL directory to build an index of all * existing WAL files. Required for garbage collection, @@ -502,29 +493,28 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows, if (xdir_scan(&writer->wal_dir)) return -1; + /* Open the most recent WAL file. */ if (wal_open(writer) != 0) return -1; + /* Enable journalling. */ journal_set(&writer->base); return 0; } -/** - * Stop WAL thread, wait until it exits, and destroy WAL writer - * if it was initialized. Called on shutdown. - */ void -wal_thread_stop() +wal_free(void) { - cbus_stop_loop(&wal_thread.wal_pipe); + struct wal_writer *writer = &wal_writer_singleton; - if (cord_join(&wal_thread.cord)) { + cbus_stop_loop(&writer->wal_pipe); + + if (cord_join(&writer->cord)) { /* We can't recover from this in any reasonable way. */ panic_syserror("WAL writer: thread join failed"); } - if (journal_is_initialized(&wal_writer_singleton.base)) - wal_writer_destroy(&wal_writer_singleton); + wal_writer_destroy(writer); } void @@ -533,7 +523,7 @@ wal_sync(void) struct wal_writer *writer = &wal_writer_singleton; if (writer->wal_mode == WAL_NONE) return; - cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL); + cbus_flush(&writer->wal_pipe, &writer->tx_prio_pipe, NULL); } static int @@ -588,7 +578,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint) return -1; } bool cancellable = fiber_set_cancellable(false); - int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &checkpoint->base, wal_begin_checkpoint_f, NULL, TIMEOUT_INFINITY); fiber_set_cancellable(cancellable); @@ -628,7 +618,7 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint) return; } bool cancellable = fiber_set_cancellable(false); - cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &checkpoint->base, wal_commit_checkpoint_f, NULL, TIMEOUT_INFINITY); fiber_set_cancellable(cancellable); @@ -658,7 +648,7 @@ wal_set_checkpoint_threshold(int64_t threshold) struct wal_set_checkpoint_threshold_msg msg; msg.checkpoint_threshold = threshold; bool cancellable = fiber_set_cancellable(false); - cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base, wal_set_checkpoint_threshold_f, NULL, TIMEOUT_INFINITY); fiber_set_cancellable(cancellable); @@ -707,7 +697,7 @@ wal_collect_garbage(const struct vclock *vclock) struct wal_gc_msg msg; msg.vclock = vclock; bool cancellable = fiber_set_cancellable(false); - cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg.base, + cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base, wal_collect_garbage_f, NULL, TIMEOUT_INFINITY); fiber_set_cancellable(cancellable); } @@ -841,7 +831,7 @@ wal_fallocate(struct wal_writer *writer, size_t len) &msg->vclock) < 0) vclock_copy(&msg->vclock, &writer->vclock); cmsg_init(&msg->base, route); - cpipe_push(&wal_thread.tx_prio_pipe, &msg->base); + cpipe_push(&writer->tx_prio_pipe, &msg->base); } else say_warn("failed to allocate gc notification message"); } @@ -874,14 +864,14 @@ wal_writer_begin_rollback(struct wal_writer *writer) * valve is closed by non-empty writer->rollback * list. */ - { wal_writer_clear_bus, &wal_thread.wal_pipe }, - { wal_writer_clear_bus, &wal_thread.tx_prio_pipe }, + { wal_writer_clear_bus, &wal_writer_singleton.wal_pipe }, + { wal_writer_clear_bus, &wal_writer_singleton.tx_prio_pipe }, /* * Step 2: writer->rollback queue contains all * messages which need to be rolled back, * perform the rollback. */ - { tx_schedule_rollback, &wal_thread.wal_pipe }, + { tx_schedule_rollback, &wal_writer_singleton.wal_pipe }, /* * Step 3: re-open the WAL for writing. */ @@ -893,7 +883,7 @@ wal_writer_begin_rollback(struct wal_writer *writer) * all input until rollback mode is off. */ cmsg_init(&writer->in_rollback, rollback_route); - cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback); + cpipe_push(&writer->tx_prio_pipe, &writer->in_rollback); } static void @@ -1013,7 +1003,7 @@ wal_write_to_disk(struct cmsg *msg) struct cmsg *msg = malloc(sizeof(*msg)); if (msg != NULL) { cmsg_init(msg, route); - cpipe_push(&wal_thread.tx_prio_pipe, msg); + cpipe_push(&writer->tx_prio_pipe, msg); writer->checkpoint_triggered = true; } else { say_warn("failed to allocate checkpoint " @@ -1056,11 +1046,12 @@ wal_write_to_disk(struct cmsg *msg) wal_notify_watchers(writer, WAL_EVENT_WRITE); } -/** WAL thread main loop. */ +/** WAL writer main loop. */ static int -wal_thread_f(va_list ap) +wal_writer_f(va_list ap) { (void) ap; + struct wal_writer *writer = &wal_writer_singleton; /** Initialize eio in this thread */ coio_enable(); @@ -1072,12 +1063,10 @@ wal_thread_f(va_list ap) * endpoint, to ensure that WAL messages are delivered * even when tx fiber pool is used up by net messages. */ - cpipe_create(&wal_thread.tx_prio_pipe, "tx_prio"); + cpipe_create(&writer->tx_prio_pipe, "tx_prio"); cbus_loop(&endpoint); - struct wal_writer *writer = &wal_writer_singleton; - /* * Create a new empty WAL on shutdown so that we don't * have to rescan the last WAL to find the instance vclock. @@ -1101,7 +1090,7 @@ wal_thread_f(va_list ap) if (xlog_is_open(&vy_log_writer.xlog)) xlog_close(&vy_log_writer.xlog, false); - cpipe_destroy(&wal_thread.tx_prio_pipe); + cpipe_destroy(&writer->tx_prio_pipe); return 0; } @@ -1131,8 +1120,8 @@ wal_write(struct journal *journal, struct journal_entry *entry) } struct wal_msg *batch; - if (!stailq_empty(&wal_thread.wal_pipe.input) && - (batch = wal_msg(stailq_first_entry(&wal_thread.wal_pipe.input, + if (!stailq_empty(&writer->wal_pipe.input) && + (batch = wal_msg(stailq_first_entry(&writer->wal_pipe.input, struct cmsg, fifo)))) { stailq_add_tail_entry(&batch->commit, entry, fifo); @@ -1151,11 +1140,11 @@ wal_write(struct journal *journal, struct journal_entry *entry) * thread right away. */ stailq_add_tail_entry(&batch->commit, entry, fifo); - cpipe_push(&wal_thread.wal_pipe, &batch->base); + cpipe_push(&writer->wal_pipe, &batch->base); } batch->approx_len += entry->approx_len; - wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX; - cpipe_flush_input(&wal_thread.wal_pipe); + writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX; + cpipe_flush_input(&writer->wal_pipe); /** * It's not safe to spuriously wakeup this fiber * since in that case it will ignore a possible @@ -1213,10 +1202,11 @@ wal_write_vy_log_f(struct cbus_call_msg *msg) int wal_write_vy_log(struct journal_entry *entry) { + struct wal_writer *writer = &wal_writer_singleton; struct wal_write_vy_log_msg msg; msg.entry= entry; bool cancellable = fiber_set_cancellable(false); - int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, + int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base, wal_write_vy_log_f, NULL, TIMEOUT_INFINITY); fiber_set_cancellable(cancellable); @@ -1235,9 +1225,10 @@ wal_rotate_vy_log_f(struct cbus_call_msg *msg) void wal_rotate_vy_log() { + struct wal_writer *writer = &wal_writer_singleton; struct cbus_call_msg msg; bool cancellable = fiber_set_cancellable(false); - cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg, + cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg, wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY); fiber_set_cancellable(cancellable); } diff --git a/src/box/wal.h b/src/box/wal.h index a9452f2bd4eefadf0f5a1d24f7a6cd8bee3cbab8..4e500d2a30fa62f40354cd1787d990a5bf9324ff 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -68,18 +68,26 @@ typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock); */ typedef void (*wal_on_checkpoint_threshold_f)(void); -void -wal_thread_start(); - +/** + * Start WAL thread and initialize WAL writer. + */ int wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows, int64_t wal_max_size, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, const struct vclock *checkpoint_vclock, wal_on_garbage_collection_f on_garbage_collection, wal_on_checkpoint_threshold_f on_checkpoint_threshold); +/** + * Setup WAL writer as journaling subsystem. + */ +int +wal_enable(void); + +/** + * Stop WAL thread and free WAL writer resources. + */ void -wal_thread_stop(); +wal_free(void); struct wal_watcher_msg { struct cmsg cmsg; diff --git a/test/vinyl/replica_quota.result b/test/vinyl/replica_quota.result index 50e3971998a6601fcb89a2152137baad659c73c2..bd09e764d5d8a257f619dc79cc6bd0c3344580a7 100644 --- a/test/vinyl/replica_quota.result +++ b/test/vinyl/replica_quota.result @@ -10,18 +10,40 @@ s = box.schema.space.create('test', { engine = 'vinyl' }) _ = s:create_index('pk', {run_count_per_level = 1}) --- ... --- Send > 2 MB to replica. -pad = string.rep('x', 1100) +_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}}) --- ... -for i = 1,1000 do s:insert{i, pad} end +test_run:cmd("setopt delimiter ';'") +--- +- true +... +pad = string.rep('x', 10000); +--- +... +function fill() + for i = 1, 50 do + box.begin() + for j = 1, 10 do + s:replace{math.random(100), math.random(100), pad} + end + box.commit() + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Send > 1 MB to replica. +fill() --- ... box.snapshot() --- - ok ... -for i = 1001,2000 do s:insert{i, pad} end +fill() --- ... -- Replica has memory limit set to 1 MB so replication would hang @@ -46,7 +68,7 @@ _ = test_run:wait_lsn('replica', 'default') _ = test_run:cmd("stop server replica") --- ... -for i = 2001,3000 do s:insert{i, pad} end +fill() --- ... _ = test_run:cmd("start server replica") @@ -67,7 +89,7 @@ box.snapshot() --- - ok ... -for i = 3001,4000 do s:insert{i, pad} end +fill() --- ... _ = test_run:cmd("start server replica") -- join diff --git a/test/vinyl/replica_quota.test.lua b/test/vinyl/replica_quota.test.lua index e04abbc22e73ccd5fff8b8a73cf92ae68a1e5314..1f373fd47ccc2b04d3cac05a3d247f12456f3f20 100644 --- a/test/vinyl/replica_quota.test.lua +++ b/test/vinyl/replica_quota.test.lua @@ -4,12 +4,25 @@ box.schema.user.grant('guest', 'replication') s = box.schema.space.create('test', { engine = 'vinyl' }) _ = s:create_index('pk', {run_count_per_level = 1}) - --- Send > 2 MB to replica. -pad = string.rep('x', 1100) -for i = 1,1000 do s:insert{i, pad} end +_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}}) + +test_run:cmd("setopt delimiter ';'") +pad = string.rep('x', 10000); +function fill() + for i = 1, 50 do + box.begin() + for j = 1, 10 do + s:replace{math.random(100), math.random(100), pad} + end + box.commit() + end +end; +test_run:cmd("setopt delimiter ''"); + +-- Send > 1 MB to replica. +fill() box.snapshot() -for i = 1001,2000 do s:insert{i, pad} end +fill() -- Replica has memory limit set to 1 MB so replication would hang -- if the scheduler didn't work on the destination. @@ -26,7 +39,7 @@ _ = test_run:wait_lsn('replica', 'default') -- Check vinyl_timeout is ignored on 'subscribe' (gh-3087). _ = test_run:cmd("stop server replica") -for i = 2001,3000 do s:insert{i, pad} end +fill() _ = test_run:cmd("start server replica") _ = test_run:wait_lsn('replica', 'default') @@ -36,7 +49,7 @@ _ = test_run:cmd("stop server replica") _ = test_run:cmd("cleanup server replica") box.snapshot() -for i = 3001,4000 do s:insert{i, pad} end +fill() _ = test_run:cmd("start server replica") -- join _ = test_run:cmd("stop server replica")