diff --git a/include/log_io.h b/include/log_io.h index 0450d48b32ced148cd9f10569e5ca7da51cea3f0..c2236c5c419c01437c8a52486937bd70eca221ec 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -57,9 +57,6 @@ struct log_io_class { row_reader *reader; u64 marker, eof_marker; size_t marker_size, eof_marker_size; - size_t rows_per_file; - /* wal_fsync_delay value for the log class. */ - double fsync_delay; bool panic_if_error; /* Additional flags to apply at open(2) to write. */ @@ -107,8 +104,14 @@ struct wal_writer; struct recovery_state { i64 lsn, confirmed_lsn; - - struct log_io *current_wal; /* the WAL we'r currently reading/writing from/to */ + /* The WAL we're currently reading/writing from/to. */ + struct log_io *current_wal; + /* + * When opening the next WAL, we want to first open + * a new file before closing the previous one. Thus + * we save the old WAL here. + */ + struct log_io *previous_wal; struct log_io_class *snap_class; struct log_io_class *wal_class; struct wal_writer *writer; @@ -123,6 +126,8 @@ struct recovery_state { ev_tstamp recovery_lag, recovery_last_update_tstamp; int snap_io_rate_limit; + int rows_per_wal; + double wal_fsync_delay; u64 cookie; struct wait_lsn wait_lsn; @@ -173,8 +178,8 @@ struct tbuf *convert_to_v11(struct tbuf *orig, u16 tag, u64 cookie, i64 lsn); void recovery_init(const char *snap_dirname, const char *xlog_dirname, row_handler row_handler, - int rows_per_file, const char *wal_mode, - double fsync_delay, + int rows_per_wal, const char *wal_mode, + double wal_fsync_delay, int flags, void *data); void recovery_update_mode(const char *wal_mode, double fsync_delay); void recovery_free(); diff --git a/src/log_io.m b/src/log_io.m index 043f78a0b533cd657251700633273b6e13ebd91a..37e898bb84ffc5e3b245ee282ad7a913b4358bca 100644 --- a/src/log_io.m +++ b/src/log_io.m @@ -169,8 +169,6 @@ v11_class(struct log_io_class *c) c->marker_size = sizeof(marker_v11); c->eof_marker = eof_marker_v11; c->eof_marker_size = sizeof(eof_marker_v11); - - c->fsync_delay = 0; } static void @@ -581,6 +579,26 @@ log_io_close(struct log_io **lptr) return r; } +/** Free log_io memory and destroy it cleanly, without side + * effects (for use in the atfork handler). + */ +static void +log_io_atfork(struct log_io **lptr) +{ + struct log_io *l = *lptr; + if (l) { + /* + * Close the file descriptor STDIO buffer does not + * make its way into the respective file in + * fclose(). + */ + close(fileno(l->f)); + fclose(l->f); + free(l); + *lptr = NULL; + } +} + static int log_io_flush(struct log_io *l) { @@ -1205,15 +1223,20 @@ recover_finalize(struct recovery_state *r) } /** - * A pthread_atfork() callback for the child. We fork - * to save a snapshot, and in the child the writer - * thread is not necessary and not present. Make sure - * that atexit() handlers do not try to stop the - * non-existent thread. + * A pthread_atfork() callback for a child process. Today we only + * fork the master process to save a snapshot, and in the child + * the WAL writer thread is not necessary and not present. */ static void wal_writer_child() { + log_io_atfork(&recovery_state->current_wal); + log_io_atfork(&recovery_state->previous_wal); + /* + * Make sure that atexit() handlers in the child do + * not try to stop the non-existent thread. + * The writer is not used in the child. + */ recovery_state->writer = NULL; } @@ -1389,55 +1412,38 @@ wal_writer_pop(struct wal_writer *writer, bool input_was_empty) static int write_to_disk(struct recovery_state *r, struct wal_write_request *req) { - static struct log_io *wal = NULL, *wal_to_close = NULL; static ev_tstamp last_flush = 0; - bool is_bulk_end = req == NULL || STAILQ_NEXT(req, wal_fifo_entry) == NULL; - -#if 0 - /* we're not running inside ev_loop, so update ev_now manually */ - ev_now_update(); -#endif + bool is_bulk_end = STAILQ_NEXT(req, wal_fifo_entry) == NULL; - /* caller requested termination */ - if (req == NULL) { - if (wal != NULL) - log_io_close(&wal); -#if 0 - if (wal_to_close != NULL) - log_io_close(&wal_to_close); - recover_free((struct recovery_state*)_state); -#endif - return 0; - } - - if (wal == NULL) { + if (r->current_wal == NULL) { int unused; /* Open WAL with '.inprogress' suffix. */ - wal = log_io_open_for_write(r, r->wal_class, req->lsn, -1, - &unused); + r->current_wal = + log_io_open_for_write(r, r->wal_class, req->lsn, -1, + &unused); } - else if (wal->rows == 1) { + else if (r->current_wal->rows == 1) { /* rename WAL after first successful write to name * without inprogress suffix*/ - if (log_io_inprogress_rename(wal->filename) != 0) { - say_error("can't rename inprogress wal"); + if (log_io_inprogress_rename(r->current_wal->filename) != 0) { + say_error("can't rename inprogress WAL"); goto fail; } } /* - * Close the file *after* we write the first record - * into the new WAL, since this is when replication - * relays get an inotify alarm (when we close the file), - * and try to reopen the next WAL. In other words, - * make sure that replication realys try to open the - * next WAL only when it exists. + * Close the file *after* we create the new WAL, since + * this is when replication relays get an inotify alarm + * (when we close the file), and try to reopen the next + * WAL. In other words, make sure that replication relays + * try to open the next WAL only when it exists. */ - if (wal_to_close != NULL) { - if (log_io_close(&wal_to_close) != 0) + if (r->previous_wal != NULL) { + if (log_io_close(&r->previous_wal) != 0) goto fail; } + struct log_io *wal = r->current_wal; if (wal == NULL) { - say_syserror("can't open wal"); + say_syserror("can't open WAL"); goto fail; } req->marker = marker_v11; @@ -1451,30 +1457,30 @@ write_to_disk(struct recovery_state *r, struct wal_write_request *req) sz += sizeof(req->marker) + sizeof(req->header_crc32c) + req->len; /* Write the request. */ if (fwrite(&req->marker, sz, 1, wal->f) != 1) { - say_syserror("can't write row header to wal"); + say_syserror("can't write row header to WAL"); goto fail; } - /* flush stdio buffer to keep replication in sync */ + /* Flush stdio buffer to keep replication in sync. */ if (is_bulk_end && fflush(wal->f) < 0) { - say_syserror("can't flush wal"); + say_syserror("can't flush WAL"); goto fail; } - if (wal->class->fsync_delay > 0 && - ev_now() - last_flush >= wal->class->fsync_delay) { + if (r->wal_fsync_delay > 0 && + ev_now() - last_flush >= r->wal_fsync_delay) { if (log_io_flush(wal) < 0) { - say_syserror("can't flush wal"); + say_syserror("can't flush WAL"); goto fail; } last_flush = ev_now(); } wal->rows++; - if (wal->class->rows_per_file <= wal->rows || - (req->lsn + 1) % wal->class->rows_per_file == 0) { - wal_to_close = wal; - wal = NULL; + if (r->rows_per_wal <= wal->rows || + (req->lsn + 1) % r->rows_per_wal == 0) { + r->previous_wal = r->current_wal; + r->current_wal = NULL; } req->out_lsn = req->lsn; @@ -1494,6 +1500,9 @@ wal_writer_thread(void *worker_args) bool input_was_empty = true; struct wal_write_request *req; + assert(r->current_wal == NULL); + assert(r->previous_wal == NULL); + tt_pthread_mutex_lock(&writer->mutex); while (writer->is_shutdown == false) { struct wal_fifo input = wal_writer_pop(writer, input_was_empty); @@ -1519,9 +1528,12 @@ wal_writer_thread(void *worker_args) * we were able to awake all fibers waiting on the * previous pack. */ + if (r->current_wal != NULL) + log_io_close(&r->current_wal); + if (r->previous_wal != NULL) + log_io_close(&r->previous_wal); if (input_was_empty == false) ev_async_send(&writer->async); - write_to_disk(r, NULL); return NULL; } @@ -1570,15 +1582,16 @@ wal_write(struct recovery_state *r, u16 tag, u16 op, u64 cookie, void recovery_init(const char *snap_dirname, const char *wal_dirname, - row_handler row_handler, int rows_per_file, - const char *wal_mode, double fsync_delay, int flags, void *data) + row_handler row_handler, int rows_per_wal, + const char *wal_mode, double wal_fsync_delay, + int flags, void *data) { assert(recovery_state == NULL); recovery_state = p0alloc(eter_pool, sizeof(struct recovery_state)); struct recovery_state *r = recovery_state; - if (rows_per_file <= 1) - panic("unacceptable value of 'rows_per_file'"); + if (rows_per_wal <= 1) + panic("unacceptable value of 'rows_per_wal'"); r->wal_timer.data = r; r->row_handler = row_handler; @@ -1588,8 +1601,8 @@ recovery_init(const char *snap_dirname, const char *wal_dirname, r->snap_class = snapshot_class_create(snap_dirname); r->wal_class = xlog_class_create(wal_dirname); - r->wal_class->rows_per_file = rows_per_file; - r->wal_class->fsync_delay = fsync_delay; + r->rows_per_wal = rows_per_wal; + r->wal_fsync_delay = wal_fsync_delay; r->wal_class->open_wflags = strcasecmp(wal_mode, "fsync") ? 0 : WAL_SYNC_FLAG; wait_lsn_clear(&r->wait_lsn); @@ -1608,7 +1621,7 @@ recovery_update_mode(const char *mode, double fsync_delay) * to it whenever there is a next lock/unlock of * wal_writer->mutex. */ - r->wal_class->fsync_delay = fsync_delay; + r->wal_fsync_delay = fsync_delay; } void @@ -1622,8 +1635,14 @@ recovery_free() v11_class_free(recovery->snap_class); v11_class_free(recovery->wal_class); - if (recovery->current_wal) + if (recovery->current_wal) { + /* + * Possible if shutting down a replication + * relay or if error during startup. + */ log_io_close(&recovery->current_wal); + } + assert(recovery->previous_wal == NULL); recovery_state = NULL; } diff --git a/src/log_io_remote.m b/src/log_io_remote.m index cb96d3ddb57ef446d5de8594a202fe06bd6e6883..d099afb6a1f7ba41870813800cf243678780587f 100644 --- a/src/log_io_remote.m +++ b/src/log_io_remote.m @@ -40,7 +40,7 @@ #include <pickle.h> static int -default_remote_row_handler(struct recovery_state *r, struct tbuf *row); +remote_apply_row(struct recovery_state *r, struct tbuf *row); static struct tbuf * remote_row_reader_v11() @@ -134,7 +134,7 @@ pull_from_remote(void *state) r->recovery_lag = ev_now() - row_v11(row)->tm; r->recovery_last_update_tstamp = ev_now(); - if (default_remote_row_handler(r, row) < 0) { + if (remote_apply_row(r, row) < 0) { fiber_close(); continue; } @@ -144,7 +144,7 @@ pull_from_remote(void *state) } static int -default_remote_row_handler(struct recovery_state *r, struct tbuf *row) +remote_apply_row(struct recovery_state *r, struct tbuf *row) { struct tbuf *data; i64 lsn = row_v11(row)->lsn; diff --git a/src/replication.m b/src/replication.m index b40ee701ea2d778629dab9fc3b48308bb1ae9257..efc782d9a40aa35893faf7e7a6526e98caa796e6 100644 --- a/src/replication.m +++ b/src/replication.m @@ -139,9 +139,11 @@ static int replication_relay_send_row(struct recovery_state *r __attribute__((unused)), struct tbuf *t); -/*-----------------------------------------------------------------------------*/ -/* replication module */ -/*-----------------------------------------------------------------------------*/ +/* + * ------------------------------------------------------------------------ + * replication module + * ------------------------------------------------------------------------ + */ /** Check replication module configuration. */ int diff --git a/src/tarantool.m b/src/tarantool.m index 01b21b93db2aec42a380a5dff73517bb1b42032c..deb7f83bfc9bec525d0d6fee7357ed4194571627 100644 --- a/src/tarantool.m +++ b/src/tarantool.m @@ -279,6 +279,10 @@ snapshot(void *ev, int events __attribute__((unused))) fiber_set_name(fiber, "dumper"); set_proc_title("dumper (%" PRIu32 ")", getppid()); + /* + * Safety: make sure we don't double-write + * parent stdio buffers at exit(). + */ close_all_xcpt(1, sayfd); snapshot_save(recovery_state, mod_snapshot);