diff --git a/cfg/core_cfg.cfg_tmpl b/cfg/core_cfg.cfg_tmpl index f2383cb0c1858ee91a7b0c10d00b9fe4fc5fdcf4..b2b930d91892b63d1aa571caac9200e175402b7b 100644 --- a/cfg/core_cfg.cfg_tmpl +++ b/cfg/core_cfg.cfg_tmpl @@ -37,9 +37,6 @@ logger_nonblock=1 # delay between loop iteraions io_collect_interval=0.0 -# do not write snapshot faster then snap_io_rate_limit MBytes/sec -snap_io_rate_limit=0.0 - # size of listen backlog backlog=1024 diff --git a/cfg/tarantool_feeder_cfg.c b/cfg/tarantool_feeder_cfg.c index bb33f996722b115250b484daa8701278f74e3f15..3e48efce7485d063a53e976f90d25b0daca7d101 100644 --- a/cfg/tarantool_feeder_cfg.c +++ b/cfg/tarantool_feeder_cfg.c @@ -40,7 +40,6 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { c->logger = NULL; c->logger_nonblock = 1; c->io_collect_interval = 0; - c->snap_io_rate_limit = 0; c->backlog = 1024; c->readahead = 16320; c->wal_feeder_bind_ipaddr = NULL; @@ -86,9 +85,6 @@ static NameAtom _name__logger_nonblock[] = { static NameAtom _name__io_collect_interval[] = { { "io_collect_interval", -1, NULL } }; -static NameAtom _name__snap_io_rate_limit[] = { - { "snap_io_rate_limit", -1, NULL } -}; static NameAtom _name__backlog[] = { { "backlog", -1, NULL } }; @@ -248,14 +244,6 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { if ( (c->io_collect_interval == 0 || c->io_collect_interval == -HUGE_VAL || c->io_collect_interval == HUGE_VAL) && errno == ERANGE) return CNF_WRONGRANGE; } - else if ( cmpNameAtoms( opt->name, _name__snap_io_rate_limit) ) { - if (opt->paramType != numberType ) - return CNF_WRONGTYPE; - errno = 0; - c->snap_io_rate_limit = strtod(opt->paramValue.numberval, NULL); - if ( (c->snap_io_rate_limit == 0 || c->snap_io_rate_limit == -HUGE_VAL || c->snap_io_rate_limit == HUGE_VAL) && errno == ERANGE) - return CNF_WRONGRANGE; - } else if ( cmpNameAtoms( opt->name, _name__backlog) ) { if (opt->paramType != numberType ) return CNF_WRONGTYPE; @@ -421,7 +409,6 @@ typedef enum IteratorState { S_name__logger, S_name__logger_nonblock, S_name__io_collect_interval, - S_name__snap_io_rate_limit, S_name__backlog, S_name__readahead, S_name__wal_feeder_bind_ipaddr, @@ -578,17 +565,6 @@ tarantool_cfg_iterator_next(tarantool_cfg_iterator_t* i, tarantool_cfg *c, char } sprintf(*v, "%g", c->io_collect_interval); snprintf(buf, PRINTBUFLEN-1, "io_collect_interval"); - i->state = S_name__snap_io_rate_limit; - return buf; - case S_name__snap_io_rate_limit: - *v = malloc(32); - if (*v == NULL) { - free(i); - out_warning(CNF_NOMEMORY, "No memory to output value"); - return NULL; - } - sprintf(*v, "%g", c->snap_io_rate_limit); - snprintf(buf, PRINTBUFLEN-1, "snap_io_rate_limit"); i->state = S_name__backlog; return buf; case S_name__backlog: diff --git a/cfg/tarantool_feeder_cfg.cfg b/cfg/tarantool_feeder_cfg.cfg index 31d357f247ec27ad9fd4f1de1b0ea65acab6f621..f6c7690bdd38b73ad4c9aadaae7208fdeda82204 100644 --- a/cfg/tarantool_feeder_cfg.cfg +++ b/cfg/tarantool_feeder_cfg.cfg @@ -39,9 +39,6 @@ logger_nonblock = 1 # delay between loop iteraions io_collect_interval = 0 -# do not write snapshot faster then snap_io_rate_limit MBytes/sec -snap_io_rate_limit = 0 - # size of listen backlog backlog = 1024 diff --git a/cfg/tarantool_feeder_cfg.cfg_tmpl b/cfg/tarantool_feeder_cfg.cfg_tmpl index c3ce41678691dad8be0808a6ee3095b57cb01b95..c87a3980a1a4d85110054b9be70f25c0f598a697 100644 --- a/cfg/tarantool_feeder_cfg.cfg_tmpl +++ b/cfg/tarantool_feeder_cfg.cfg_tmpl @@ -42,9 +42,6 @@ logger_nonblock=1 # delay between loop iteraions io_collect_interval=0.0 -# do not write snapshot faster then snap_io_rate_limit MBytes/sec -snap_io_rate_limit=0.0 - # size of listen backlog backlog=1024 diff --git a/cfg/tarantool_feeder_cfg.h b/cfg/tarantool_feeder_cfg.h index 0efdbfa1b0f95f4d3bfd9cf5b2f4f5acf1e17945..0f279b3e364f0065657a4d4deb5828ed715223d0 100644 --- a/cfg/tarantool_feeder_cfg.h +++ b/cfg/tarantool_feeder_cfg.h @@ -54,9 +54,6 @@ typedef struct tarantool_cfg { /* delay between loop iteraions */ double io_collect_interval; - /* do not write snapshot faster then snap_io_rate_limit MBytes/sec */ - double snap_io_rate_limit; - /* size of listen backlog */ int32_t backlog; diff --git a/cfg/tarantool_silverbox_cfg.c b/cfg/tarantool_silverbox_cfg.c index caf93de716273b012481883fad04077c67807b05..10c9d296c1856accf346a4787e31a655a31e7dee 100644 --- a/cfg/tarantool_silverbox_cfg.c +++ b/cfg/tarantool_silverbox_cfg.c @@ -40,7 +40,6 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { c->logger = NULL; c->logger_nonblock = 1; c->io_collect_interval = 0; - c->snap_io_rate_limit = 0; c->backlog = 1024; c->readahead = 16320; c->snap_dir = strdup("."); @@ -55,11 +54,14 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { c->memcached_namespace = 23; c->memcached_expire_per_loop = 1024; c->memcached_expire_full_sweep = 3600; + c->snap_io_rate_limit = 0; c->rows_per_wal = 500000; c->wal_fsync_delay = 0; c->wal_writer_inbox_size = 128; c->local_hot_standby = 0; c->wal_dir_rescan_delay = 0.1; + c->panic_on_snap_error = 1; + c->panic_on_wal_error = 0; c->remote_hot_standby = 0; c->wal_feeder_ipaddr = NULL; c->wal_feeder_port = 0; @@ -129,9 +131,6 @@ static NameAtom _name__logger_nonblock[] = { static NameAtom _name__io_collect_interval[] = { { "io_collect_interval", -1, NULL } }; -static NameAtom _name__snap_io_rate_limit[] = { - { "snap_io_rate_limit", -1, NULL } -}; static NameAtom _name__backlog[] = { { "backlog", -1, NULL } }; @@ -168,6 +167,9 @@ static NameAtom _name__memcached_expire_per_loop[] = { static NameAtom _name__memcached_expire_full_sweep[] = { { "memcached_expire_full_sweep", -1, NULL } }; +static NameAtom _name__snap_io_rate_limit[] = { + { "snap_io_rate_limit", -1, NULL } +}; static NameAtom _name__rows_per_wal[] = { { "rows_per_wal", -1, NULL } }; @@ -183,6 +185,12 @@ static NameAtom _name__local_hot_standby[] = { static NameAtom _name__wal_dir_rescan_delay[] = { { "wal_dir_rescan_delay", -1, NULL } }; +static NameAtom _name__panic_on_snap_error[] = { + { "panic_on_snap_error", -1, NULL } +}; +static NameAtom _name__panic_on_wal_error[] = { + { "panic_on_wal_error", -1, NULL } +}; static NameAtom _name__remote_hot_standby[] = { { "remote_hot_standby", -1, NULL } }; @@ -367,14 +375,6 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { if ( (c->io_collect_interval == 0 || c->io_collect_interval == -HUGE_VAL || c->io_collect_interval == HUGE_VAL) && errno == ERANGE) return CNF_WRONGRANGE; } - else if ( cmpNameAtoms( opt->name, _name__snap_io_rate_limit) ) { - if (opt->paramType != numberType ) - return CNF_WRONGTYPE; - errno = 0; - c->snap_io_rate_limit = strtod(opt->paramValue.numberval, NULL); - if ( (c->snap_io_rate_limit == 0 || c->snap_io_rate_limit == -HUGE_VAL || c->snap_io_rate_limit == HUGE_VAL) && errno == ERANGE) - return CNF_WRONGRANGE; - } else if ( cmpNameAtoms( opt->name, _name__backlog) ) { if (opt->paramType != numberType ) return CNF_WRONGTYPE; @@ -495,6 +495,14 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { return CNF_WRONGRANGE; c->memcached_expire_full_sweep = i32; } + else if ( cmpNameAtoms( opt->name, _name__snap_io_rate_limit) ) { + if (opt->paramType != numberType ) + return CNF_WRONGTYPE; + errno = 0; + c->snap_io_rate_limit = strtod(opt->paramValue.numberval, NULL); + if ( (c->snap_io_rate_limit == 0 || c->snap_io_rate_limit == -HUGE_VAL || c->snap_io_rate_limit == HUGE_VAL) && errno == ERANGE) + return CNF_WRONGRANGE; + } else if ( cmpNameAtoms( opt->name, _name__rows_per_wal) ) { if (opt->paramType != numberType ) return CNF_WRONGTYPE; @@ -547,6 +555,28 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { if ( (c->wal_dir_rescan_delay == 0 || c->wal_dir_rescan_delay == -HUGE_VAL || c->wal_dir_rescan_delay == HUGE_VAL) && errno == ERANGE) return CNF_WRONGRANGE; } + else if ( cmpNameAtoms( opt->name, _name__panic_on_snap_error) ) { + if (opt->paramType != numberType ) + return CNF_WRONGTYPE; + errno = 0; + long int i32 = strtol(opt->paramValue.numberval, NULL, 10); + if (i32 == 0 && errno == EINVAL) + return CNF_WRONGINT; + if ( (i32 == LONG_MIN || i32 == LONG_MAX) && errno == ERANGE) + return CNF_WRONGRANGE; + c->panic_on_snap_error = i32; + } + else if ( cmpNameAtoms( opt->name, _name__panic_on_wal_error) ) { + if (opt->paramType != numberType ) + return CNF_WRONGTYPE; + errno = 0; + long int i32 = strtol(opt->paramValue.numberval, NULL, 10); + if (i32 == 0 && errno == EINVAL) + return CNF_WRONGINT; + if ( (i32 == LONG_MIN || i32 == LONG_MAX) && errno == ERANGE) + return CNF_WRONGRANGE; + c->panic_on_wal_error = i32; + } else if ( cmpNameAtoms( opt->name, _name__remote_hot_standby) ) { if (opt->paramType != numberType ) return CNF_WRONGTYPE; @@ -769,7 +799,6 @@ typedef enum IteratorState { S_name__logger, S_name__logger_nonblock, S_name__io_collect_interval, - S_name__snap_io_rate_limit, S_name__backlog, S_name__readahead, S_name__snap_dir, @@ -782,11 +811,14 @@ typedef enum IteratorState { S_name__memcached_namespace, S_name__memcached_expire_per_loop, S_name__memcached_expire_full_sweep, + S_name__snap_io_rate_limit, S_name__rows_per_wal, S_name__wal_fsync_delay, S_name__wal_writer_inbox_size, S_name__local_hot_standby, S_name__wal_dir_rescan_delay, + S_name__panic_on_snap_error, + S_name__panic_on_wal_error, S_name__remote_hot_standby, S_name__wal_feeder_ipaddr, S_name__wal_feeder_port, @@ -953,17 +985,6 @@ tarantool_cfg_iterator_next(tarantool_cfg_iterator_t* i, tarantool_cfg *c, char } sprintf(*v, "%g", c->io_collect_interval); snprintf(buf, PRINTBUFLEN-1, "io_collect_interval"); - i->state = S_name__snap_io_rate_limit; - return buf; - case S_name__snap_io_rate_limit: - *v = malloc(32); - if (*v == NULL) { - free(i); - out_warning(CNF_NOMEMORY, "No memory to output value"); - return NULL; - } - sprintf(*v, "%g", c->snap_io_rate_limit); - snprintf(buf, PRINTBUFLEN-1, "snap_io_rate_limit"); i->state = S_name__backlog; return buf; case S_name__backlog: @@ -1093,6 +1114,17 @@ tarantool_cfg_iterator_next(tarantool_cfg_iterator_t* i, tarantool_cfg *c, char } sprintf(*v, "%"PRId32, c->memcached_expire_full_sweep); snprintf(buf, PRINTBUFLEN-1, "memcached_expire_full_sweep"); + i->state = S_name__snap_io_rate_limit; + return buf; + case S_name__snap_io_rate_limit: + *v = malloc(32); + if (*v == NULL) { + free(i); + out_warning(CNF_NOMEMORY, "No memory to output value"); + return NULL; + } + sprintf(*v, "%g", c->snap_io_rate_limit); + snprintf(buf, PRINTBUFLEN-1, "snap_io_rate_limit"); i->state = S_name__rows_per_wal; return buf; case S_name__rows_per_wal: @@ -1148,6 +1180,28 @@ tarantool_cfg_iterator_next(tarantool_cfg_iterator_t* i, tarantool_cfg *c, char } sprintf(*v, "%g", c->wal_dir_rescan_delay); snprintf(buf, PRINTBUFLEN-1, "wal_dir_rescan_delay"); + i->state = S_name__panic_on_snap_error; + return buf; + case S_name__panic_on_snap_error: + *v = malloc(32); + if (*v == NULL) { + free(i); + out_warning(CNF_NOMEMORY, "No memory to output value"); + return NULL; + } + sprintf(*v, "%"PRId32, c->panic_on_snap_error); + snprintf(buf, PRINTBUFLEN-1, "panic_on_snap_error"); + i->state = S_name__panic_on_wal_error; + return buf; + case S_name__panic_on_wal_error: + *v = malloc(32); + if (*v == NULL) { + free(i); + out_warning(CNF_NOMEMORY, "No memory to output value"); + return NULL; + } + sprintf(*v, "%"PRId32, c->panic_on_wal_error); + snprintf(buf, PRINTBUFLEN-1, "panic_on_wal_error"); i->state = S_name__remote_hot_standby; return buf; case S_name__remote_hot_standby: diff --git a/cfg/tarantool_silverbox_cfg.cfg b/cfg/tarantool_silverbox_cfg.cfg index 8a3668cdc5940f96f0a7727c0b2e2732d162c9a3..de8d1edb3e11d37d50ef0f03a2871435303cb78c 100644 --- a/cfg/tarantool_silverbox_cfg.cfg +++ b/cfg/tarantool_silverbox_cfg.cfg @@ -39,9 +39,6 @@ logger_nonblock = 1 # delay between loop iteraions io_collect_interval = 0 -# do not write snapshot faster then snap_io_rate_limit MBytes/sec -snap_io_rate_limit = 0 - # size of listen backlog backlog = 1024 @@ -79,6 +76,9 @@ memcached_expire_per_loop = 1024 # tarantool will try iterate all rows within this time memcached_expire_full_sweep = 3600 +# do not write snapshot faster then snap_io_rate_limit MBytes/sec +snap_io_rate_limit = 0 + # Write no more rows in WAL rows_per_wal = 500000 @@ -96,6 +96,11 @@ local_hot_standby = 0 # delay in fractional seconds between successive re-readings of wal_dir wal_dir_rescan_delay = 0.1 +# panic if where is error reading snap or wal +# be default panic any snapshot reading error and ignore errors then reading wals +panic_on_snap_error = 1 +panic_on_wal_error = 0 + # Remote hot standby (if enabled server will run in hot standby mode # continuously fetching WAL records from wal_feeder_ipaddr:wal_feeder_port remote_hot_standby = 0 diff --git a/cfg/tarantool_silverbox_cfg.cfg_tmpl b/cfg/tarantool_silverbox_cfg.cfg_tmpl index 264a62d751a3ac9f7acd952d9559e93a789b72e8..444d681863f65c09089cae2923c0d7424b930969 100644 --- a/cfg/tarantool_silverbox_cfg.cfg_tmpl +++ b/cfg/tarantool_silverbox_cfg.cfg_tmpl @@ -42,9 +42,6 @@ logger_nonblock=1 # delay between loop iteraions io_collect_interval=0.0 -# do not write snapshot faster then snap_io_rate_limit MBytes/sec -snap_io_rate_limit=0.0 - # size of listen backlog backlog=1024 @@ -79,6 +76,10 @@ memcached_expire_per_loop=1024 # tarantool will try iterate all rows within this time memcached_expire_full_sweep=3600 + +# do not write snapshot faster then snap_io_rate_limit MBytes/sec +snap_io_rate_limit=0.0 + # Write no more rows in WAL rows_per_wal=500000 @@ -95,6 +96,12 @@ local_hot_standby=0 # delay in fractional seconds between successive re-readings of wal_dir wal_dir_rescan_delay=0.1 + +# panic if where is error reading snap or wal +# be default panic any snapshot reading error and ignore errors then reading wals +panic_on_snap_error=1 +panic_on_wal_error=0 + # Remote hot standby (if enabled server will run in hot standby mode # continuously fetching WAL records from wal_feeder_ipaddr:wal_feeder_port remote_hot_standby=0 diff --git a/cfg/tarantool_silverbox_cfg.h b/cfg/tarantool_silverbox_cfg.h index b548b064cce9a9516066f82b2cf2916e93e6d90b..02d6b6559ffb0ad9b22791cd0c42ce369220a10e 100644 --- a/cfg/tarantool_silverbox_cfg.h +++ b/cfg/tarantool_silverbox_cfg.h @@ -72,9 +72,6 @@ typedef struct tarantool_cfg { /* delay between loop iteraions */ double io_collect_interval; - /* do not write snapshot faster then snap_io_rate_limit MBytes/sec */ - double snap_io_rate_limit; - /* size of listen backlog */ int32_t backlog; @@ -114,6 +111,9 @@ typedef struct tarantool_cfg { /* tarantool will try iterate all rows within this time */ int32_t memcached_expire_full_sweep; + /* do not write snapshot faster then snap_io_rate_limit MBytes/sec */ + double snap_io_rate_limit; + /* Write no more rows in WAL */ int32_t rows_per_wal; @@ -135,6 +135,13 @@ typedef struct tarantool_cfg { /* delay in fractional seconds between successive re-readings of wal_dir */ double wal_dir_rescan_delay; + /* + * panic if where is error reading snap or wal + * be default panic any snapshot reading error and ignore errors then reading wals + */ + int32_t panic_on_snap_error; + int32_t panic_on_wal_error; + /* * Remote hot standby (if enabled server will run in hot standby mode * continuously fetching WAL records from wal_feeder_ipaddr:wal_feeder_port diff --git a/core/fiber.c b/core/fiber.c index d800bfa6e6cfa898af011ead42b0f91030ec5930..f3bec8d7d9f7c284c923d9002183edd9aa40294f 100644 --- a/core/fiber.c +++ b/core/fiber.c @@ -384,6 +384,8 @@ fiber_peer_name(struct fiber *fiber) snprintf(fiber->peer_name, sizeof(fiber->peer_name), "%s:%d", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port)); + fiber->cookie = 0; + memcpy(&fiber->cookie, &peer, MIN(sizeof(peer), sizeof(fiber->cookie))); return fiber->peer_name; } diff --git a/core/log_io.c b/core/log_io.c index 963ab38368af36f0bc0353be908186e0a604240f..ed9b9a2876c4acead16bf074e4129cfe17c17300 100644 --- a/core/log_io.c +++ b/core/log_io.c @@ -39,7 +39,6 @@ #include <fiber.h> #include <log_io.h> -#include <log_io_internal.h> #include <palloc.h> #include <say.h> #include <third_party/crc32.h> @@ -47,7 +46,9 @@ #include <pickle.h> #include <tbuf.h> -const u16 default_tag = 0; +const u16 snap_tag = -1; +const u16 wal_tag = -2; +const u64 default_cookie = 0; const u32 default_version = 11; const u32 snap_marker_v04 = -1U; const u64 xlog_marker_v04 = -1ULL; @@ -77,6 +78,19 @@ struct log_io_iter { int io_rate_limit; }; +struct row_v04 { + i64 lsn; /* this used to be tid */ + u16 type; + u32 len; + u8 data[]; +} __packed__; + +static inline struct row_v04 *row_v04(const struct tbuf *t) +{ + return (struct row_v04 *)t->data; +} + + int confirm_lsn(struct recovery_state *r, i64 lsn) { @@ -254,6 +268,8 @@ read_rows(struct log_io_iter *i) goto eof; if (row == NULL) { + if (l->class->panic_if_error) + panic("failed to read row"); say_warn("failed to read row"); goto restart; } @@ -430,16 +446,17 @@ find_including_file(struct log_io_class *class, i64 target_lsn) } struct tbuf * -convert_to_v11(struct tbuf *orig, i64 lsn) +convert_to_v11(struct tbuf *orig, u16 tag, i64 lsn) { struct tbuf *row = tbuf_alloc(orig->pool); tbuf_ensure(row, sizeof(struct row_v11)); row->len = sizeof(struct row_v11); row_v11(row)->lsn = lsn; row_v11(row)->tm = 0; - row_v11(row)->len = orig->len; + row_v11(row)->len = orig->len + sizeof(tag) + sizeof(default_cookie); - tbuf_append(row, &default_tag, sizeof(default_tag)); + tbuf_append(row, &tag, sizeof(tag)); + tbuf_append(row, &default_cookie, sizeof(default_cookie)); tbuf_append(row, orig->data, orig->len); return row; } @@ -489,7 +506,7 @@ row_reader_v04(FILE *f, struct palloc_pool *pool) tbuf_append(data, &row_v04(m)->type, sizeof(row_v04(m)->type)); tbuf_append(data, row_v04(m)->data, row_v04(m)->len); - return convert_to_v11(data, row_v04(m)->lsn); + return convert_to_v11(data, wal_tag, row_v04(m)->lsn); } static struct tbuf * @@ -554,22 +571,9 @@ close_log(struct log_io **lptr) static int flush_log(struct log_io *l) { - - static double last = 0; - double now; - struct timeval t; - if (fflush(l->f) < 0) return -1; - if (gettimeofday(&t, NULL) < 0) { - say_syserror("gettimeofday"); - return -1; - } - now = t.tv_sec + t.tv_usec / 1000000.; - - if (l->class->fsync_delay == 0 || now - last < l->class->fsync_delay) - return 0; #ifdef Linux if (fdatasync(fileno(l->f)) < 0) { say_syserror("fdatasync"); @@ -581,7 +585,6 @@ flush_log(struct log_io *l) return -1; } #endif - last = now; return 0; } @@ -838,7 +841,7 @@ recover_snap(struct recovery_state *r) say_info("recover from `%s'", snap->filename); while ((row = iter_inner(&i, (void *)1))) { - if (r->snap_row_handler(r, row) < 0) { + if (r->row_handler(r, row) < 0) { result = -1; goto out; } @@ -873,9 +876,14 @@ static int recover_wal(struct recovery_state *r, struct log_io *l) { struct log_io_iter i; - struct tbuf *row; + struct tbuf *row = NULL; int result; + if (setjmp(fiber->exc) != 0) { + result = -1; + goto out; + } + memset(&i, 0, sizeof(i)); iter_open(l, &i, read_rows); @@ -887,7 +895,7 @@ recover_wal(struct recovery_state *r, struct log_io *l) } /* after handler(r, row) returned, row may be modified, do not use it */ - if (r->wal_row_handler(r, row) < 0) { + if (r->row_handler(r, row) < 0) { say_error("row_handler returned error"); result = -1; goto out; @@ -1159,12 +1167,16 @@ static struct tbuf * write_to_disk(void *_state, struct tbuf *t) { static struct log_io *wal = NULL, *wal_to_close = NULL; + static ev_tstamp last_flush = 0; static size_t rows = 0; struct tbuf *reply, *header; struct recovery_state *r = _state; u32 result = 0; int suffix = 0; + /* we're not running inside ev_loop, so update ev_now manually */ + ev_now_update(); + /* caller requested termination */ if (t == NULL) { if (wal != NULL) @@ -1215,9 +1227,12 @@ write_to_disk(void *_state, struct tbuf *t) goto fail; } - if (flush_log(wal) < 0) { - say_syserror("can't flush wal"); - goto fail; + if (wal->class->fsync_delay > 0 && ev_now() - last_flush >= wal->class->fsync_delay) { + if (flush_log(wal) < 0) { + say_syserror("can't flush wal"); + goto fail; + } + last_flush = ev_now(); } rows++; @@ -1239,16 +1254,19 @@ write_to_disk(void *_state, struct tbuf *t) } bool -wal_write(struct recovery_state *r, i64 lsn, struct tbuf *data) +wal_write(struct recovery_state *r, u16 tag, u64 cookie, i64 lsn, struct tbuf *row) { - struct tbuf *m = tbuf_alloc(data->pool); + struct tbuf *m = tbuf_alloc(row->pool); struct msg *a; say_debug("wal_write lsn=%" PRIi64, lsn); - tbuf_reserve(m, sizeof(struct wal_write_request) + data->len); + tbuf_reserve(m, sizeof(struct wal_write_request) + sizeof(tag) + sizeof(cookie) + row->len); + m->len = sizeof(struct wal_write_request); wal_write_request(m)->lsn = lsn; - wal_write_request(m)->len = data->len; - memcpy(wal_write_request(m)->data, data->data, data->len); + wal_write_request(m)->len = row->len + sizeof(tag) + sizeof(cookie); + tbuf_append(m, &tag, sizeof(tag)); + tbuf_append(m, &cookie, sizeof(cookie)); + tbuf_append(m, row->data, row->len); if (write_inbox(r->wal_writer->out, m) == false) { say_warn("wal writer inbox is full"); @@ -1265,15 +1283,14 @@ wal_write(struct recovery_state *r, i64 lsn, struct tbuf *data) struct recovery_state * recover_init(const char *snap_dirname, const char *wal_dirname, - row_reader snap_row_reader, row_handler snap_row_handler, row_handler wal_row_handler, - int rows_per_file, double fsync_delay, double snap_io_rate_limit, + row_reader snap_row_reader, row_handler row_handler, + int rows_per_file, double fsync_delay, int inbox_size, int flags, void *data) { struct recovery_state *r = p0alloc(eter_pool, sizeof(*r)); r->wal_timer.data = r; - r->snap_row_handler = snap_row_handler; - r->wal_row_handler = wal_row_handler; + r->row_handler = row_handler; r->data = data; r->snap_class = snap_classes(snap_row_reader, snap_dirname); @@ -1284,13 +1301,24 @@ recover_init(const char *snap_dirname, const char *wal_dirname, r->wal_prefered_class->rows_per_file = rows_per_file; r->wal_prefered_class->fsync_delay = fsync_delay; - if ((flags & RECOVER_READONLY) == 0) { + if ((flags & RECOVER_READONLY) == 0) r->wal_writer = spawn_child("wal_writer", inbox_size, write_to_disk, r); - r->snap_io_rate_limit = snap_io_rate_limit * 1024 * 1024; - } + return r; } +void +recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_error) +{ + struct log_io_class **class; + + for (class = r->wal_class; *class; class++) + (*class)->panic_if_error = on_wal_error; + + for (class = r->snap_class; *class; class++) + (*class)->panic_if_error = on_snap_error; +} + static void write_rows(struct log_io_iter *i) { @@ -1328,29 +1356,36 @@ write_rows(struct log_io_iter *i) } void -snapshot_write_row(struct log_io_iter *i, struct tbuf *row) +snapshot_write_row(struct log_io_iter *i, u16 tag, struct tbuf *row) { static int rows; static int bytes; - static struct timeval last; + ev_tstamp elapsed; + static ev_tstamp last = 0; + struct tbuf *wal_row = tbuf_alloc(row->pool); - i->to = row; + tbuf_append(wal_row, &tag, sizeof(tag)); + tbuf_append(wal_row, row->data, row->len); + + i->to = wal_row; if (i->io_rate_limit > 0) { - if (last.tv_sec == 0) - gettimeofday(&last, NULL); - bytes += row->len; + if (last == 0) { + ev_now_update(); + last = ev_now(); + } - while (bytes >= i->io_rate_limit) { - struct timeval now; - useconds_t elapsed; + bytes += row->len + sizeof(struct row_v11); - gettimeofday(&now, NULL); - elapsed = (now.tv_sec - last.tv_sec) * 1000000 + now.tv_usec - last.tv_usec; + while (bytes >= i->io_rate_limit) { + flush_log(i->log); - if (elapsed < 1000000) - usleep(1000000 - elapsed); + ev_now_update(); + elapsed = ev_now() - last; + if (elapsed < 1) + usleep(((1 - elapsed) * 1000000)); - gettimeofday(&last, NULL); + ev_now_update(); + last = ev_now(); bytes -= i->io_rate_limit; } } diff --git a/core/log_io_remote.c b/core/log_io_remote.c index d25ca20456644356e01debde4be8e896e10f1035..fc8713702ef8ff31a3d0abf353f6a8565dd87910 100644 --- a/core/log_io_remote.c +++ b/core/log_io_remote.c @@ -36,34 +36,43 @@ #include <say.h> #include <log_io.h> -#include <log_io_internal.h> +#include <pickle.h> struct remote_state { struct recovery_state *r; int (*handler) (struct recovery_state * r, struct tbuf *row); }; +static u32 +row_v11_len(struct tbuf *r) +{ + if (r->len < sizeof(struct row_v11)) + return 0; + + if (r->len < sizeof(struct row_v11) + row_v11(r)->len) + return 0; + + return sizeof(struct row_v11) + row_v11(r)->len; +} + static struct tbuf * -row_reader_v11(struct palloc_pool *pool) +row_reader_v11() { const int header_size = sizeof(struct row_v11); - struct tbuf *m = tbuf_alloc(pool); - tbuf_ensure(m, header_size); + struct tbuf *m; - if (fiber_read(m->data, header_size) != header_size) { - say_error("unexpected eof reading row header"); - return NULL; - } - tbuf_ensure(m, header_size + row_v11(m)->len); - m->len = header_size + row_v11(m)->len; + for (;;) { + if (row_v11_len(fiber->rbuf) != 0) { + m = tbuf_split(fiber->rbuf, row_v11_len(fiber->rbuf)); + say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m)); + return m; + } - if (fiber_read(row_v11(m)->data, row_v11(m)->len) != row_v11(m)->len) { - say_error("unexpected eof reading row body"); - return NULL; + if (fiber_bread(fiber->rbuf, header_size) <= 0) { + say_error("unexpected eof reading row header"); + return NULL; + } } - - say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m)); - return m; } static struct tbuf * @@ -117,6 +126,7 @@ remote_read_row(i64 initial_lsn) say_info("will retry every %i second", reconnect_delay); warning_said = true; } + fiber_close(); fiber_sleep(reconnect_delay); } } @@ -127,14 +137,20 @@ pull_from_remote(void *state) struct remote_state *h = state; struct tbuf *row; + if (setjmp(fiber->exc) != 0) + fiber_close(); + for (;;) { row = remote_read_row(h->r->confirmed_lsn + 1); h->r->recovery_lag = ev_now() - row_v11(row)->tm; + h->r->recovery_last_update_tstamp = ev_now(); - if (h->handler(h->r, row) < 0) + if (h->handler(h->r, row) < 0) { + fiber_close(); continue; + } - prelease_after(fiber->pool, 128 * 1024); + fiber_gc(); } } @@ -143,15 +159,19 @@ default_remote_row_handler(struct recovery_state *r, struct tbuf *row) { struct tbuf *data; i64 lsn = row_v11(row)->lsn; + u16 tag; /* save row data since wal_row_handler may clobber it */ data = tbuf_alloc(row->pool); tbuf_append(data, row_v11(row)->data, row_v11(row)->len); - if (r->wal_row_handler(r, row) < 0) + if (r->row_handler(r, row) < 0) panic("replication failure: can't apply row"); - if (wal_write(r, lsn, data) == false) + tag = read_u16(data); + (void)read_u64(data); /* drop the cookie */ + + if (wal_write(r, tag, r->cookie, lsn, data) == false) panic("replication failure: can't write row to WAL"); next_lsn(r, lsn); @@ -193,6 +213,7 @@ recover_follow_remote(struct recovery_state *r, char *ip_addr, int port, memcpy(&addr->sin_addr.s_addr, &server, sizeof(server)); addr->sin_port = htons(port); f->data = addr; + memcpy(&r->cookie, &addr, MIN(sizeof(r->cookie), sizeof(addr))); fiber_call(f); return f; } diff --git a/core/pickle.c b/core/pickle.c index e0b770ad589f598c9fc32b24b6c2d2589e858a33..5f385cf50455aa78693378e46d4fec9cde4e1964 100644 --- a/core/pickle.c +++ b/core/pickle.c @@ -79,47 +79,22 @@ write_varint32(struct tbuf *b, u32 value) append_byte(b, (u8)((value) & 0x7F)); } -u16 -read_u16(struct tbuf *b) -{ - if (b->len < 2) - raise(ERR_CODE_UNKNOWN_ERROR, "buffer too short"); - - u16 r = *(u16 *)b->data; /* FIXME: endianess & aligment */ - b->size -= 2; - b->len -= 2; - b->data += 2; - - return r; -} - -u32 -read_u32(struct tbuf *b) -{ - if (b->len < 4) - raise(ERR_CODE_UNKNOWN_ERROR, "buffer too short"); - - u32 r = *(u32 *)b->data; /* FIXME: endianess & aligment */ - b->size -= 4; - b->len -= 4; - b->data += 4; - - return r; -} - -u8 -read_u8(struct tbuf *b) -{ - if (b->len < 1) - raise(ERR_CODE_UNKNOWN_ERROR, "buffer too short"); - - u8 r = *(u8 *)b->data; - b->size -= 1; - b->len -= 1; - b->data += 1; +#define read_u(bits) \ + u##bits read_u##bits(struct tbuf *b) \ + { \ + if (b->len < (bits)/8) \ + raise(ERR_CODE_UNKNOWN_ERROR, "buffer too short"); \ + u##bits r = *(u##bits *)b->data; \ + b->size -= (bits)/8; \ + b->len -= (bits)/8; \ + b->data += (bits)/8; \ + return r; \ + } - return r; -} +read_u(8) +read_u(16) +read_u(32) +read_u(64) u32 read_varint32(struct tbuf *buf) diff --git a/core/tarantool.c b/core/tarantool.c index 08fa581d01e978be06a34cf8a031a82525d24cae..df5c458ed48285e5cbf0def05cdb3fb6b36e2f33 100644 --- a/core/tarantool.c +++ b/core/tarantool.c @@ -44,7 +44,6 @@ #include <fiber.h> #include <iproto.h> #include <log_io.h> -#include <log_io_internal.h> #include <palloc.h> #include <salloc.h> #include <say.h> diff --git a/include/fiber.h b/include/fiber.h index 073923524d0505550e86ecb9df90fdf9ab4665be..63e21ad58a2a8441908b3048eecf392cc76cd206 100644 --- a/include/fiber.h +++ b/include/fiber.h @@ -81,6 +81,7 @@ struct fiber { void *data; + u64 cookie; bool has_peer; char peer_name[32]; bool reading_inbox; diff --git a/include/log_io.h b/include/log_io.h index 76c7dd6fc96d2ba37b69a9ece5ca750106220827..c97be26c2722606789b53b13f1462c52b09d480c 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -37,16 +37,67 @@ #define RECOVER_READONLY 1 -extern const u16 default_tag; +extern const u16 wal_tag, snap_tag; +extern const u64 default_cookie; extern const u32 default_version; -struct log_io; struct recovery_state; typedef int (row_handler) (struct recovery_state *, struct tbuf *); typedef struct tbuf *(row_reader) (FILE *f, struct palloc_pool *pool); -struct row_v04 { - i64 lsn; /* this used to be tid */ - u16 type; + +enum log_mode { + LOG_READ, + LOG_WRITE +}; + +struct log_io_class { + row_reader *reader; + u64 marker, eof_marker; + size_t marker_size, eof_marker_size; + size_t rows_per_file; + double fsync_delay; + bool panic_if_error; + + const char *filetype; + const char *version; + const char *suffix; + const char *dirname; +}; + +struct log_io { + struct log_io_class *class; + FILE *f; + + ev_stat stat; + enum log_mode mode; + size_t rows; + size_t retry; + char filename[PATH_MAX + 1]; +}; + +struct recovery_state { + i64 lsn, confirmed_lsn; + + struct log_io *current_wal; /* the WAL we'r currently reading/writing from/to */ + struct log_io_class **snap_class, **wal_class, *snap_prefered_class, *wal_prefered_class; + struct child *wal_writer; + + /* row_handler will be presented by most recent format of data + log_io_class->reader is responsible of converting data from old format */ + row_handler *row_handler; + + ev_timer wal_timer; + ev_tstamp recovery_lag, recovery_last_update_tstamp; + + int snap_io_rate_limit; + u64 cookie; + + /* pointer to user supplied custom data */ + void *data; +}; + +struct wal_write_request { + i64 lsn; u32 len; u8 data[]; } __packed__; @@ -60,30 +111,24 @@ struct row_v11 { u8 data[]; } __packed__; -static inline struct row_v04 *row_v04(const struct tbuf *t) -{ - return (struct row_v04 *)t->data; -} - static inline struct row_v11 *row_v11(const struct tbuf *t) { return (struct row_v11 *)t->data; } -struct tbuf *convert_to_v11(struct tbuf *orig, i64 lsn); +struct tbuf *convert_to_v11(struct tbuf *orig, u16 tag, i64 lsn); struct recovery_state *recover_init(const char *snap_dirname, const char *xlog_dirname, - row_reader snap_row_reader, row_handler snap_row_handler, - row_handler xlog_row_handler, int rows_per_file, - double fsync_delay, double snap_io_rate_limit, int inbox_size, + row_reader snap_row_reader, row_handler row_handler, + int rows_per_file, double fsync_delay, int inbox_size, int flags, void *data); int recover(struct recovery_state *, i64 lsn); void recover_follow(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay); void recover_finalize(struct recovery_state *r); -bool wal_write(struct recovery_state *r, i64 lsn, struct tbuf *data); +bool wal_write(struct recovery_state *r, u16 tag, u64 cookie, i64 lsn, struct tbuf *data); + +void recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_error); -/* recovery accessors */ -struct palloc_pool *recovery_pool(struct recovery_state *r); int confirm_lsn(struct recovery_state *r, i64 lsn); int64_t next_lsn(struct recovery_state *r, i64 new_lsn); @@ -95,6 +140,7 @@ struct fiber *recover_follow_remote(struct recovery_state *r, char *ip_addr, int int (*handler) (struct recovery_state *r, struct tbuf *row)); struct log_io_iter; -void snapshot_write_row(struct log_io_iter *i, struct tbuf *row); +void snapshot_write_row(struct log_io_iter *i, u16 tag, struct tbuf *row); void snapshot_save(struct recovery_state *r, void (*loop) (struct log_io_iter *)); + #endif diff --git a/include/log_io_internal.h b/include/log_io_internal.h deleted file mode 100644 index 056d3389edc59eeee2cd7f8d14343ac783be475f..0000000000000000000000000000000000000000 --- a/include/log_io_internal.h +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (C) 2010 Mail.RU - * Copyright (C) 2010 Yuriy Vostrikov - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS - * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ - -#ifndef TARANTOOL_LOG_IO_INTERNAL_H -#define TARANTOOL_LOG_IO_INTERNAL_H - -enum log_mode { - LOG_READ, - LOG_WRITE -}; - -struct log_io_class { - row_reader *reader; - u64 marker, eof_marker; - size_t marker_size, eof_marker_size; - size_t rows_per_file; - double fsync_delay; - - const char *filetype; - const char *version; - const char *suffix; - const char *dirname; -}; - -struct log_io { - struct log_io_class *class; - FILE *f; - - ev_stat stat; - enum log_mode mode; - size_t rows; - size_t retry; - char filename[PATH_MAX + 1]; -}; - -struct recovery_state { - i64 lsn, confirmed_lsn; - - struct log_io *current_wal; /* the WAL we'r currently reading/writing from/to */ - struct log_io_class **snap_class, **wal_class, *snap_prefered_class, *wal_prefered_class; - struct child *wal_writer; - - /* handlers will be presented by most new format of data - log_io_class->reader is responsible of converting data from old format */ - row_handler *wal_row_handler, *snap_row_handler; - ev_timer wal_timer; - ev_tstamp recovery_lag; - - int snap_io_rate_limit; - - /* pointer to user supplied custom data */ - void *data; -}; - -struct wal_write_request { - i64 lsn; - u32 len; - u8 data[]; -} __packed__; - -bool wal_write(struct recovery_state *r, i64 lsn, struct tbuf *data); - -#endif diff --git a/include/pickle.h b/include/pickle.h index 250ba240adeb1157b4babe14b41b7aa28adb5d4a..fd98135bab5a4b23dd02a9d15bcc4ec2612a96c8 100644 --- a/include/pickle.h +++ b/include/pickle.h @@ -35,6 +35,8 @@ void write_varint32(struct tbuf *b, u32 value); u8 read_u8(struct tbuf *b); u16 read_u16(struct tbuf *b); u32 read_u32(struct tbuf *b); +u64 read_u64(struct tbuf *b); + u32 read_varint32(struct tbuf *buf); void *read_field(struct tbuf *buf); diff --git a/mod/feeder/feeder.c b/mod/feeder/feeder.c index babfe3d18972048c2134ff5b7e8eeb79edba7c30..16957dee7e488655dddc7d3b84364ef9821824e3 100644 --- a/mod/feeder/feeder.c +++ b/mod/feeder/feeder.c @@ -80,7 +80,7 @@ recover_feed_slave(int sock) send_row(NULL, ver); log_io = recover_init(NULL, cfg.wal_feeder_dir, - NULL, NULL, send_row, 0, 0, 0, 64, RECOVER_READONLY, false); + NULL, send_row, 0, 0, 64, RECOVER_READONLY, false); recover(log_io, lsn); recover_follow(log_io, 0.1); diff --git a/mod/silverbox/box.c b/mod/silverbox/box.c index 869a06f2ce43e5f116b525c653a5207ca92e9af6..1d5e94001f7d2294c4c2dd722e054250d60c528c 100644 --- a/mod/silverbox/box.c +++ b/mod/silverbox/box.c @@ -32,7 +32,6 @@ #include <fiber.h> #include <iproto.h> #include <log_io.h> -#include <log_io_internal.h> #include <pickle.h> #include <salloc.h> #include <say.h> @@ -1217,13 +1216,13 @@ box_dispach(struct box_txn *txn, enum box_mode mode, u16 op, struct tbuf *data) if (ret_code == -1) { if (!txn->in_recover) { + fiber_peer_name(fiber); /* fill the cookie */ struct tbuf *t = tbuf_alloc(fiber->pool); - tbuf_append(t, &default_tag, sizeof(default_tag)); tbuf_append(t, &op, sizeof(op)); tbuf_append(t, req.data, req.len); i64 lsn = next_lsn(recovery_state, 0); - if (!wal_write(recovery_state, lsn, t)) { + if (!wal_write(recovery_state, wal_tag, fiber->cookie, lsn, t)) { ret_code = ERR_CODE_UNKNOWN_ERROR; goto abort; } @@ -1248,12 +1247,14 @@ box_dispach(struct box_txn *txn, enum box_mode mode, u16 op, struct tbuf *data) static int box_xlog_sprint(struct tbuf *buf, const struct tbuf *t) { - struct row_v04 *row = row_v04(t); + struct row_v11 *row = row_v11(t); struct tbuf *b = palloc(fiber->pool, sizeof(*b)); b->data = row->data; b->len = row->len; - u32 op = row->type; + u16 tag, op; + u64 cookie; + struct sockaddr_in *peer = (void *)&cookie; u32 n, key_len; void *key; @@ -1264,10 +1265,15 @@ box_xlog_sprint(struct tbuf *buf, const struct tbuf *t) tbuf_printf(buf, "lsn:%" PRIi64 " ", row->lsn); say_debug("b->len:%" PRIu32, b->len); + + tag = read_u16(b); + cookie = read_u64(b); + op = read_u16(b); n = read_u32(b); - tbuf_printf(buf, "%s ", messages_strs[op]); - tbuf_printf(buf, "n:%i ", n); + tbuf_printf(buf, "tm:%.3f t:%"PRIu16 " %s:%d %s n:%i", + row->tm, tag, inet_ntoa(peer->sin_addr), ntohs(peer->sin_port), + messages_strs[op], n); switch (op) { case INSERT: @@ -1342,30 +1348,17 @@ box_snap_reader(FILE *f, struct palloc_pool *pool) if (fread(box_snap_row(row)->data, box_snap_row(row)->data_size, 1, f) != 1) return NULL; - return convert_to_v11(row, 0); + return convert_to_v11(row, snap_tag, 0); } static int -snap_apply(struct recovery_state *r __unused__, struct tbuf *t) +snap_apply(struct box_txn *txn, struct tbuf *t) { struct box_snap_row *row; - struct box_txn *txn = txn_alloc(0); - - /* drop wal header */ - if (tbuf_peek(t, sizeof(struct row_v11)) == NULL) - return -1; - - u16 tag = read_u16(t); - if (tag != 0) - return -1; row = box_snap_row(t); - txn->in_recover = true; txn->n = row->namespace; - if (txn->n == 25) - return 0; - if (!namespace[txn->n].enabled) { say_error("namespace %i is not configured", txn->n); return -1; @@ -1384,14 +1377,29 @@ snap_apply(struct recovery_state *r __unused__, struct tbuf *t) txn->op = INSERT; txn_commit(txn); + return 0; +} + +static int +wal_apply(struct box_txn *txn, struct tbuf *t) +{ + + u64 cookie = read_u64(t); + (void)cookie; + + u16 type = read_u16(t); + if (box_dispach(txn, RW, type, t) != 0) + return -1; + txn_cleanup(txn); return 0; } static int -xlog_apply(struct recovery_state *r __unused__, struct tbuf *t) +recover_row(struct recovery_state *r __unused__, struct tbuf *t) { struct box_txn *txn = txn_alloc(0); + int result = -1; txn->in_recover = true; /* drop wal header */ @@ -1399,17 +1407,20 @@ xlog_apply(struct recovery_state *r __unused__, struct tbuf *t) return -1; u16 tag = read_u16(t); - if (tag != 0) - return -1; - - u16 type = read_u16(t); - if (box_dispach(txn, RW, type, t) != 0) + if (tag == wal_tag) { + result = wal_apply(txn, t); + } else if (tag == snap_tag) { + result = snap_apply(txn, t); + } else { + say_error("unknown row tag: %i", (int)tag); return -1; + } txn_cleanup(txn); - return 0; + return result; } + static int snap_print(struct recovery_state *r __unused__, struct tbuf *t) { @@ -1768,11 +1779,14 @@ mod_init(void) } recovery_state = recover_init(cfg.snap_dir, cfg.wal_dir, - box_snap_reader, snap_apply, xlog_apply, - cfg.rows_per_wal, cfg.wal_fsync_delay, cfg.snap_io_rate_limit, + box_snap_reader, recover_row, + cfg.rows_per_wal, cfg.wal_fsync_delay, cfg.wal_writer_inbox_size, init_storage ? RECOVER_READONLY : 0, NULL); + recovery_state->snap_io_rate_limit = cfg.snap_io_rate_limit * 1024 * 1024; + recovery_setup_panic(recovery_state, cfg.panic_on_snap_error, cfg.panic_on_wal_error); + /* initialize hashes _after_ starting wal writer */ if (cfg.memcached != 0) { int n = cfg.memcached_namespace > 0 ? cfg.memcached_namespace : MEMCACHED_NAMESPACE; @@ -1867,11 +1881,10 @@ mod_snapshot(struct log_io_iter *i) header.data_size = tuple->bsize; tbuf_reset(row); - tbuf_append(row, &default_tag, sizeof(default_tag)); tbuf_append(row, &header, sizeof(header)); tbuf_append(row, tuple->data, tuple->bsize); - snapshot_write_row(i, row); + snapshot_write_row(i, snap_tag, row); } } } @@ -1885,6 +1898,8 @@ mod_info(struct tbuf *out) tbuf_printf(out, " pid: %i\r\n", getpid()); tbuf_printf(out, " wal_writer_pid: %" PRIi64 "\r\n", (i64)recovery_state->wal_writer->pid); tbuf_printf(out, " lsn: %" PRIi64 "\r\n", recovery_state->confirmed_lsn); + tbuf_printf(out, " recovery_lag: %.3f\r\n", recovery_state->recovery_lag); + tbuf_printf(out, " recovery_last_update: %.3f\r\n", recovery_state->recovery_last_update_tstamp); tbuf_printf(out, " status: %s\r\n", status); } diff --git a/mod/silverbox/box_cfg.cfg_tmpl b/mod/silverbox/box_cfg.cfg_tmpl index 54ca6ca3eeca6bad879afedf460f059f98f45907..1bc8a9bf9d4a9b1a7d8b035dd898384057fa5551 100644 --- a/mod/silverbox/box_cfg.cfg_tmpl +++ b/mod/silverbox/box_cfg.cfg_tmpl @@ -28,6 +28,10 @@ memcached_expire_per_loop=1024 # tarantool will try iterate all rows within this time memcached_expire_full_sweep=3600 + +# do not write snapshot faster then snap_io_rate_limit MBytes/sec +snap_io_rate_limit=0.0 + # Write no more rows in WAL rows_per_wal=500000 @@ -44,6 +48,12 @@ local_hot_standby=0 # delay in fractional seconds between successive re-readings of wal_dir wal_dir_rescan_delay=0.1 + +# panic if where is error reading snap or wal +# be default panic any snapshot reading error and ignore errors then reading wals +panic_on_snap_error=1 +panic_on_wal_error=0 + # Remote hot standby (if enabled server will run in hot standby mode # continuously fetching WAL records from wal_feeder_ipaddr:wal_feeder_port remote_hot_standby=0