diff --git a/core/log_io.c b/core/log_io.c index 3c8a2076dc339f8671cd2d596f105784e58217e8..3d36719285ef250acd8516affed2a87f182e854d 100644 --- a/core/log_io.c +++ b/core/log_io.c @@ -47,21 +47,25 @@ #include <pickle.h> #include <tbuf.h> +const u16 default_tag = 0; +const u32 default_version = 11; const u32 snap_marker_v04 = -1U; const u64 xlog_marker_v04 = -1ULL; const u64 xlog_eof_marker_v04 = 0; -const u32 marker_v05 = 0xba0babed; -const u32 marker_v05_eof = 0x10adab1e; +const u32 marker_v11 = 0xba0babed; +const u32 eof_marker_v11 = 0x10adab1e; const char *snap_suffix = ".snap"; const char *xlog_suffix = ".xlog"; const char *v04 = "0.04\n"; const char *v03 = "0.03\n"; +const char *v11 = "0.11\n"; const char *snap_mark = "SNAP\n"; const char *xlog_mark = "XLOG\n"; #define ROW_EOF (void *)1 static struct tbuf *row_reader_v04(FILE *f, struct palloc_pool *pool); +static struct tbuf *row_reader_v11(FILE *f, struct palloc_pool *pool); struct log_io_iter { struct tarantool_coro coro; @@ -112,35 +116,16 @@ next_lsn(struct recovery_state *r, i64 new_lsn) else r->lsn = new_lsn; - say_debug("next_lsn(%p, %"PRIi64") => %"PRIi64, r, new_lsn, r->lsn); + say_debug("next_lsn(%p, %" PRIi64 ") => %" PRIi64, r, new_lsn, r->lsn); return r->lsn; } -static void -snap_class(struct log_io_class *c) -{ - c->suffix = snap_suffix; - c->filetype = snap_mark; - c->version = v03; - c->eof_marker_size = 0; /* no end marker */ - c->marker = snap_marker_v04; - c->marker_size = sizeof(snap_marker_v04); - c->rows_per_file = 0; -} - -static i64 -row_v04_lsn(const struct tbuf *t) -{ - return row_v04(t)->lsn; -} - static void xlog04_class(struct log_io_class *c) { c->suffix = xlog_suffix; c->filetype = xlog_mark; c->version = v04; - c->row_lsn = row_v04_lsn; c->reader = row_reader_v04; c->marker = xlog_marker_v04; c->marker_size = sizeof(xlog_marker_v04); @@ -151,6 +136,69 @@ xlog04_class(struct log_io_class *c) c->fsync_delay = 0; } +static void +v11_class(struct log_io_class *c) +{ + c->suffix = xlog_suffix; + c->filetype = xlog_mark; + c->version = v11; + c->reader = row_reader_v11; + c->marker = marker_v11; + c->marker_size = sizeof(marker_v11); + c->eof_marker = eof_marker_v11; + c->eof_marker_size = sizeof(eof_marker_v11); + + c->fsync_delay = 0; +} + +static struct log_io_class ** +snap_classes(row_reader snap_row_reader, const char *dirname) +{ + struct log_io_class **c = calloc(3, sizeof(*c)); + if (c == NULL) + panic("calloc"); + + c[0] = calloc(1, sizeof(**c)); + c[1] = calloc(1, sizeof(**c)); + if (c[0] == NULL || c[1] == NULL) + panic("calloc"); + + c[0]->suffix = snap_suffix; + c[0]->filetype = snap_mark; + c[0]->version = v03; + c[0]->eof_marker_size = 0; /* no end marker */ + c[0]->marker = snap_marker_v04; + c[0]->marker_size = sizeof(snap_marker_v04); + c[0]->rows_per_file = 0; + c[0]->reader = snap_row_reader; + + v11_class(c[1]); + c[1]->filetype = c[0]->filetype; + c[1]->suffix = c[0]->suffix; + + c[0]->dirname = c[1]->dirname = dirname; + return c; +} + +static struct log_io_class ** +xlog_classes(const char *dirname) +{ + struct log_io_class **c = calloc(3, sizeof(*c)); + if (c == NULL) + panic("calloc"); + + c[0] = calloc(1, sizeof(**c)); + c[1] = calloc(1, sizeof(**c)); + if (c[0] == NULL || c[1] == NULL) + panic("calloc"); + + xlog04_class(c[0]); + v11_class(c[1]); + + c[0]->dirname = c[1]->dirname = dirname; + return c; +} + static void * iter_inner(struct log_io_iter *i, void *data) { @@ -194,21 +242,24 @@ read_rows(struct log_io_iter *i) fseeko(l->f, marker_offset + 1, SEEK_SET); for (;;) { - say_debug("read_rows: loop start offt %" PRI_OFFT, ftello(l->f)); + say_debug("read_rows: loop start offt 0x%08" PRI_XFFT, ftello(l->f)); if (fread(&magic, l->class->marker_size, 1, l->f) != 1) goto eof; while ((magic & marker_mask) != l->class->marker) { int c = fgetc(l->f); - if (c == EOF) + if (c == EOF) { + say_debug("eof while looking for magic"); goto eof; - magic = (magic << 8) + (c & 0xff); + } + magic >>= 8; + magic |= (((u64)c & 0xff) << ((l->class->marker_size - 1) * 8)); } marker_offset = ftello(l->f) - l->class->marker_size; if (good_offset != marker_offset) - say_warn("skipped %" PRI_OFFT " bytes after %" PRI_OFFT " offset", + say_warn("skipped %" PRI_OFFT " bytes after 0x%08" PRI_XFFT " offset", marker_offset - good_offset, good_offset); - say_debug("magic found at %" PRI_OFFT, marker_offset); + say_debug("magic found at 0x%08" PRI_XFFT, marker_offset); row = l->class->reader(l->f, fiber->pool); if (row == ROW_EOF) @@ -390,6 +441,21 @@ find_including_file(struct log_io_class *class, i64 target_lsn) return *lsn; } +struct tbuf * +convert_to_v11(struct tbuf *orig, i64 lsn) +{ + struct tbuf *row = tbuf_alloc(orig->pool); + tbuf_ensure(row, sizeof(struct row_v11)); + row->len = sizeof(struct row_v11); + row_v11(row)->lsn = lsn; + row_v11(row)->tm = 0; + row_v11(row)->len = orig->len; + + tbuf_append(row, &default_tag, sizeof(default_tag)); + tbuf_append(row, orig->data, orig->len); + return row; +} + static struct tbuf * row_reader_v04(FILE *f, struct palloc_pool *pool) { @@ -428,7 +494,51 @@ row_reader_v04(FILE *f, struct palloc_pool *pool) return NULL; } - say_debug("read row success lsn:%" PRIi64, row_v04(m)->lsn); + say_debug("read row v04 success lsn:%" PRIi64, row_v04(m)->lsn); + + /* we're copying row data twice here, it's ok since this is legacy function */ + struct tbuf *data = tbuf_alloc(pool); + tbuf_append(data, &row_v04(m)->type, sizeof(row_v04(m)->type)); + tbuf_append(data, row_v04(m)->data, row_v04(m)->len); + + return convert_to_v11(data, row_v04(m)->lsn); +} + +static struct tbuf * +row_reader_v11(FILE *f, struct palloc_pool *pool) +{ + struct tbuf *m = tbuf_alloc(pool); + + u32 header_crc, data_crc; + + tbuf_ensure(m, sizeof(struct row_v11)); + if (fread(m->data, sizeof(struct row_v11), 1, f) != 1) + return ROW_EOF; + + m->len = offsetof(struct row_v11, data); + + /* header crc32c calculated on <lsn, tm, len, data_crc32c> */ + header_crc = crc32c(0, m->data + offsetof(struct row_v11, lsn), + sizeof(struct row_v11) - offsetof(struct row_v11, lsn)); + + if (row_v11(m)->header_crc32c != header_crc) { + say_error("header crc32c mismatch"); + return NULL; + } + + tbuf_ensure(m, m->len + row_v11(m)->len); + if (fread(row_v11(m)->data, row_v11(m)->len, 1, f) != 1) + return ROW_EOF; + + m->len += row_v11(m)->len; + + data_crc = crc32c(0, row_v11(m)->data, row_v11(m)->len); + if (row_v11(m)->data_crc32c != data_crc) { + say_error("data crc32c mismatch"); + return NULL; + } + + say_debug("read row v11 success lsn:%" PRIi64, row_v11(m)->lsn); return m; } @@ -499,12 +609,15 @@ write_header(struct log_io *l) if (fwrite(l->class->version, strlen(l->class->version), 1, l->f) != 1) return -1; - time(&tm); - ctime_r(&tm, buf); - /* 20 bytes is hardcoded timestring length in silverspoon */ - //buf[19] = '\n'; - if (fwrite(buf, strlen(buf), 1, l->f) != 1) - return -1; + if (strcmp(l->class->version, v11) == 0) { + if (fwrite("\n", 1, 1, l->f) != 1) + return -1; + } else { + time(&tm); + ctime_r(&tm, buf); + if (fwrite(buf, strlen(buf), 1, l->f) != 1) + return -1; + } return 0; } @@ -535,10 +648,10 @@ format_filename(char *filename, struct log_io_class *class, i64 lsn, int suffix) } static struct log_io * -open_for_read(struct recovery_state *recover, struct log_io_class *class, i64 lsn, int suffix, +open_for_read(struct recovery_state *recover, struct log_io_class **class, i64 lsn, int suffix, const char *filename) { - char filetype[32], version[32], buf[32]; + char filetype[32], version[32], buf[256]; struct log_io *l = NULL; char *r; char *error = "unknown error"; @@ -548,13 +661,12 @@ open_for_read(struct recovery_state *recover, struct log_io_class *class, i64 ls goto error; memset(l, 0, sizeof(*l)); l->mode = LOG_READ; - l->class = class; l->stat.data = recover; /* when filename is not null it is forced open for debug reading */ if (filename == NULL) { assert(lsn != 0); - format_filename(l->filename, class, lsn, suffix); + format_filename(l->filename, *class, lsn, suffix); } else { assert(lsn == 0); strncpy(l->filename, filename, PATH_MAX); @@ -580,21 +692,40 @@ open_for_read(struct recovery_state *recover, struct log_io_class *class, i64 ls goto error; } - if (strcmp(class->filetype, filetype) != 0) { + if (strcmp((*class)->filetype, filetype) != 0) { error = "unknown filetype"; goto error; } - if (strcmp(class->version, version) != 0) { - error = "unknown version"; - goto error; + while (*class) { + if (strcmp((*class)->version, version) == 0) + break; + class++; } - r = fgets(buf, sizeof(buf), l->f); /* skip line with time */ - if (r == NULL) { - error = "header reading failed"; + if (*class == NULL) { + error = "unknown version"; goto error; } + l->class = *class; + + if (strcmp(version, v11) == 0) { + for (;;) { + r = fgets(buf, sizeof(buf), l->f); + if (r == NULL) { + error = "header reading failed"; + goto error; + } + if (strcmp(r, "\n") == 0 || strcmp(r, "\r\n") == 0) + break; + } + } else { + r = fgets(buf, sizeof(buf), l->f); /* skip line with time */ + if (r == NULL) { + error = "header reading failed"; + goto error; + } + } return l; error: @@ -655,39 +786,32 @@ open_for_write(struct recovery_state *recover, struct log_io_class *class, i64 l /* this little hole shouldn't be used too much */ int read_log(const char *filename, row_reader reader, - row_handler xlog_handler, row_handler snap_handler, void *state) + row_handler *xlog_handler, row_handler *snap_handler, void *state) { struct log_io_iter i; struct log_io *l; - struct log_io_class class; + struct log_io_class **c; struct tbuf *row; - - memset(&i, 0, sizeof(i)); - memset(&class, 0, sizeof(class)); + row_handler *h; if (strstr(filename, xlog_suffix)) { - class.handler = xlog_handler; - xlog04_class(&class); - } - if (strstr(filename, snap_suffix)) { - class.handler = snap_handler; - snap_class(&class); - if (reader) - class.reader = reader; - } - - if (class.filetype == NULL) { + c = xlog_classes(NULL); + h = xlog_handler; + } else if (strstr(filename, snap_suffix)) { + c = snap_classes(reader, NULL); + h = snap_handler; + } else { say_error("don't know what how to read `%s'", filename); return -1; } - l = open_for_read(NULL, &class, 0, 0, filename); + l = open_for_read(NULL, c, 0, 0, filename); iter_open(l, &i, read_rows); while ((row = iter_inner(&i, (void *)1))) - class.handler(state, row); + h(state, row); if (i.error != 0) - say_error("log `%s' wasn't correctly closed", filename); + say_error("binary log `%s' wasn't correctly closed", filename); close_iter(&i); return i.error; @@ -709,14 +833,14 @@ recover_snap(struct recovery_state *r) goto out; } - lsn = greatest_lsn(&r->snap_class); + lsn = greatest_lsn(r->snap_prefered_class); if (lsn <= 0) { say_error("can't find snapshot"); goto out; } - snap = open_for_read(r, &r->snap_class, lsn, 0, NULL); + snap = open_for_read(r, r->snap_class, lsn, 0, NULL); if (snap == NULL) { say_error("can't find/open snapshot"); goto out; @@ -726,7 +850,7 @@ recover_snap(struct recovery_state *r) say_info("recover from `%s'", snap->filename); while ((row = iter_inner(&i, (void *)1))) { - if (snap->class->handler(r, row) < 0) { + if (r->snap_row_handler(r, row) < 0) { result = -1; goto out; } @@ -768,14 +892,14 @@ recover_wal(struct recovery_state *r, struct log_io *l) iter_open(l, &i, read_rows); while ((row = iter_inner(&i, (void *)1))) { - i64 lsn = l->class->row_lsn(row); + i64 lsn = row_v11(row)->lsn; if (r && lsn <= confirmed_lsn(r)) { say_debug("skipping too young row"); continue; } /* after handler(r, row) returned, row may be modified, do not use it */ - if (l->class->handler(r, row) < 0) { + if (r->wal_row_handler(r, row) < 0) { say_error("row_handler returned error"); result = -1; goto out; @@ -823,7 +947,7 @@ recover_remaining_wals(struct recovery_state *r) size_t rows_before; current_lsn = confirmed_lsn(r) + 1; - wal_greatest_lsn = greatest_lsn(&r->wal_class); + wal_greatest_lsn = greatest_lsn(r->wal_prefered_class); /* if the caller already opened WAL for us, recover from it first */ if (r->current_wal != NULL) @@ -844,7 +968,7 @@ recover_remaining_wals(struct recovery_state *r) } current_lsn = confirmed_lsn(r) + 1; /* TODO: find better way looking for next xlog */ - next_wal = open_for_read(r, &r->wal_class, current_lsn, suffix, NULL); + next_wal = open_for_read(r, r->wal_class, current_lsn, suffix, NULL); if (next_wal == NULL) { if (suffix++ < 10) continue; @@ -875,11 +999,12 @@ recover_remaining_wals(struct recovery_state *r) if (suffix++ < 10) continue; - say_error("too many filename confilcters"); + say_error("too many filename conflicters"); result = -1; break; } else { - name = format_filename(NULL, &r->wal_class, current_lsn, suffix + 1); + name = format_filename(NULL, r->wal_prefered_class, + current_lsn, suffix + 1); if (access(name, F_OK) == 0) { say_error("found conflicter `%s' after successful reading", name); result = -1; @@ -922,9 +1047,8 @@ recover(struct recovery_state *r, i64 lsn) if (lsn == 0) { result = recover_snap(r); if (result < 0) { - if (greatest_lsn(&r->snap_class) <= 0) { - say_crit - ("don't you forget to initialize storage with --init_storage switch?"); + if (greatest_lsn(r->snap_prefered_class) <= 0) { + say_crit("don't you forget to initialize storage with --init_storage switch?"); _exit(1); } panic("snapshot recovery failed"); @@ -943,13 +1067,13 @@ recover(struct recovery_state *r, i64 lsn) */ if (r->current_wal == NULL) { i64 next_lsn = confirmed_lsn(r) + 1; - i64 lsn = find_including_file(&r->wal_class, next_lsn); + i64 lsn = find_including_file(r->wal_prefered_class, next_lsn); if (lsn <= 0) { say_error("can't find wal containing record with lsn:%" PRIi64, next_lsn); result = -1; goto out; } else { - r->current_wal = open_for_read(r, &r->wal_class, lsn, 0, NULL); + r->current_wal = open_for_read(r, r->wal_class, lsn, 0, NULL); if (r->current_wal == NULL) { result = -1; goto out; @@ -958,6 +1082,8 @@ recover(struct recovery_state *r, i64 lsn) } result = recover_remaining_wals(r); + if (result < 0) + panic("recover failed"); say_info("wals recovered, confirmed lsn: %" PRIi64, confirmed_lsn(r)); out: prelease(fiber->pool); @@ -990,7 +1116,7 @@ recover_follow_file(ev_stat *w, int revents __unused__) int result; result = recover_wal(r, r->current_wal); if (result < 0) - panic("recover failed: %i", result); + panic("recover failed"); if (result == LOG_EOF) { say_info("done `%s' confirmed_lsn:%" PRIi64, r->current_wal->filename, confirmed_lsn(r)); @@ -1002,9 +1128,9 @@ recover_follow_file(ev_stat *w, int revents __unused__) void recover_follow(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay) { - ev_timer_init(&r->wal_class.timer, recover_follow_dir, + ev_timer_init(&r->wal_timer, recover_follow_dir, wal_dir_rescan_delay, wal_dir_rescan_delay); - ev_timer_start(&r->wal_class.timer); + ev_timer_start(&r->wal_timer); if (r->current_wal != NULL) { ev_stat *stat = &r->current_wal->stat; ev_stat_init(stat, recover_follow_file, r->current_wal->filename, 0.); @@ -1016,8 +1142,9 @@ void recover_finalize(struct recovery_state *r) { int result; - if (ev_is_active(&r->wal_class.timer)) - ev_timer_stop(&r->wal_class.timer); + + if (ev_is_active(&r->wal_timer)) + ev_timer_stop(&r->wal_timer); if (r->current_wal != NULL) { if (ev_is_active(&r->current_wal->stat)) @@ -1029,8 +1156,7 @@ recover_finalize(struct recovery_state *r) panic("unable to scucessfully finalize recovery"); if (r->current_wal != NULL && result != LOG_EOF) { - say_warn("wal `%s' wasn't correctly closed", - r->current_wal->filename); + say_warn("wal `%s' wasn't correctly closed", r->current_wal->filename); close_log(&r->current_wal); } } @@ -1042,12 +1168,11 @@ wal_write_request(const struct tbuf *t) } static struct tbuf * -write_to_disk_v04(void *_state, struct tbuf *t) +write_to_disk(void *_state, struct tbuf *t) { static struct log_io *wal = NULL, *wal_to_close = NULL; static size_t rows = 0; - struct tbuf *reply; - u32 calculated_crc; + struct tbuf *reply, *header; struct recovery_state *r = _state; u32 result = 0; int suffix = 0; @@ -1063,7 +1188,7 @@ write_to_disk_v04(void *_state, struct tbuf *t) /* if there is filename conflict, try filename with lager suffix */ while (wal == NULL && suffix < 10) { - wal = open_for_write(r, &r->wal_class, wal_write_request(t)->lsn, suffix); + wal = open_for_write(r, r->wal_prefered_class, wal_write_request(t)->lsn, suffix); suffix++; } if (wal_to_close != NULL) { @@ -1078,14 +1203,27 @@ write_to_disk_v04(void *_state, struct tbuf *t) say_syserror("can't write marker to wal"); goto fail; } - if (fwrite(wal_write_request(t)->data, wal_write_request(t)->len, 1, wal->f) != 1) { - say_syserror("can't write data to wal"); + + header = tbuf_alloc(t->pool); + tbuf_ensure(header, sizeof(struct row_v11)); + header->len = sizeof(struct row_v11); + + row_v11(header)->lsn = wal_write_request(t)->lsn; + row_v11(header)->tm = ev_now(); + row_v11(header)->len = wal_write_request(t)->len; + row_v11(header)->data_crc32c = + crc32c(0, wal_write_request(t)->data, wal_write_request(t)->len); + row_v11(header)->header_crc32c = + crc32c(0, header->data + field_sizeof(struct row_v11, header_crc32c), + sizeof(struct row_v11) - field_sizeof(struct row_v11, header_crc32c)); + + if (fwrite(header->data, header->len, 1, wal->f) != 1) { + say_syserror("can't write row header to wal"); goto fail; } - calculated_crc = crc32(wal_write_request(t)->data, wal_write_request(t)->len); - if (fwrite(&calculated_crc, sizeof(calculated_crc), 1, wal->f) != 1) { - say_syserror("can't write crc to wal"); + if (fwrite(wal_write_request(t)->data, wal_write_request(t)->len, 1, wal->f) != 1) { + say_syserror("can't write row data to wal"); goto fail; } @@ -1137,50 +1275,29 @@ wal_write(struct recovery_state *r, i64 lsn, struct tbuf *data) return reply == 0; } -bool -wal_write_v04(struct recovery_state *r, int op, const u8 *data, size_t len) -{ - i64 lsn = next_lsn(r, 0); - struct tbuf *m = tbuf_alloc(fiber->pool); - tbuf_reserve(m, sizeof(struct row_v04) + len); - row_v04(m)->lsn = lsn; - row_v04(m)->type = op; - row_v04(m)->len = len; - memcpy(row_v04(m)->data, data, row_v04(m)->len); - - if (wal_write(r, lsn, m)) { - confirm_lsn(r, lsn); - return true; - } - - say_warn("wal_write failed, txn lsn:%" PRIi64 " aborted", lsn); - return false; -} - struct recovery_state * recover_init(const char *snap_dirname, const char *wal_dirname, row_reader snap_row_reader, row_handler snap_row_handler, row_handler wal_row_handler, int rows_per_file, double fsync_delay, double snap_io_rate_limit, int inbox_size, int flags, void *data) { - struct recovery_state *r = malloc(sizeof(*r)); /* let it leak */ - memset(r, 0, sizeof(*r)); + struct recovery_state *r = p0alloc(eter_pool, sizeof(*r)); - snap_class(&r->snap_class); - r->snap_class.dirname = snap_dirname; - r->snap_class.reader = snap_row_reader; - r->snap_class.handler = snap_row_handler; + r->wal_timer.data = r; + r->snap_row_handler = snap_row_handler; + r->wal_row_handler = wal_row_handler; + r->data = data; - xlog04_class(&r->wal_class); - r->wal_class.dirname = wal_dirname; - r->wal_class.handler = wal_row_handler; - r->wal_class.timer.data = r; - r->wal_class.rows_per_file = rows_per_file; - r->wal_class.fsync_delay = fsync_delay; + r->snap_class = snap_classes(snap_row_reader, snap_dirname); + r->snap_prefered_class = r->snap_class[1]; + + r->wal_class = xlog_classes(wal_dirname); + r->wal_prefered_class = r->wal_class[1]; + r->wal_prefered_class->rows_per_file = rows_per_file; + r->wal_prefered_class->fsync_delay = fsync_delay; - r->data = data; if ((flags & RECOVER_READONLY) == 0) { - r->wal_writer = spawn_child("wal_writer", inbox_size, write_to_disk_v04, r); + r->wal_writer = spawn_child("wal_writer", inbox_size, write_to_disk, r); r->snap_io_rate_limit = snap_io_rate_limit * 1024 * 1024; } return r; @@ -1190,19 +1307,35 @@ static void write_rows(struct log_io_iter *i) { struct log_io *l = i->log; - struct tbuf *row; + struct tbuf *row, *data; + + row = tbuf_alloc(eter_pool); + tbuf_ensure(row, sizeof(struct row_v11)); + row->len = sizeof(struct row_v11); goto start; for (;;) { coro_transfer(&i->coro.ctx, &fiber->coro.ctx); start: - row = i->to; + data = i->to; if (fwrite(&l->class->marker, l->class->marker_size, 1, l->f) != 1) panic("fwrite"); + row_v11(row)->lsn = 0; /* unused */ + row_v11(row)->tm = ev_now(); + row_v11(row)->len = data->len; + row_v11(row)->data_crc32c = crc32c(0, data->data, data->len); + row_v11(row)->header_crc32c = + crc32c(0, row->data + field_sizeof(struct row_v11, header_crc32c), + sizeof(struct row_v11) - field_sizeof(struct row_v11, + header_crc32c)); + if (fwrite(row->data, row->len, 1, l->f) != 1) panic("fwrite"); + + if (fwrite(data->data, data->len, 1, l->f) != 1) + panic("fwrite"); } } @@ -1248,7 +1381,7 @@ snapshot_save(struct recovery_state *r, void (*f) (struct log_io_iter *)) memset(&i, 0, sizeof(i)); - snap = open_for_write(r, &r->snap_class, confirmed_lsn(r), -1); + snap = open_for_write(r, r->snap_prefered_class, confirmed_lsn(r), -1); if (snap == NULL) panic("can't open snap for writing"); diff --git a/core/log_io_internal.h b/core/log_io_internal.h index 56654532c6b24fca5c7227439c35a0c20f3e90ce..056d3389edc59eeee2cd7f8d14343ac783be475f 100644 --- a/core/log_io_internal.h +++ b/core/log_io_internal.h @@ -33,11 +33,7 @@ enum log_mode { }; struct log_io_class { - ev_timer timer; - - i64(*row_lsn) (const struct tbuf *); - row_handler handler; - row_reader reader; + row_reader *reader; u64 marker, eof_marker; size_t marker_size, eof_marker_size; size_t rows_per_file; @@ -64,10 +60,19 @@ struct recovery_state { i64 lsn, confirmed_lsn; struct log_io *current_wal; /* the WAL we'r currently reading/writing from/to */ - struct log_io_class snap_class, wal_class; + struct log_io_class **snap_class, **wal_class, *snap_prefered_class, *wal_prefered_class; struct child *wal_writer; - void *data; + + /* handlers will be presented by most new format of data + log_io_class->reader is responsible of converting data from old format */ + row_handler *wal_row_handler, *snap_row_handler; + ev_timer wal_timer; + ev_tstamp recovery_lag; + int snap_io_rate_limit; + + /* pointer to user supplied custom data */ + void *data; }; struct wal_write_request { diff --git a/core/log_io_remote.c b/core/log_io_remote.c index 73954ee993e05cee2f068f57a947df47cc4d7b20..cc0d6b7646c2a53cd2de08d46a7a185e4029820e 100644 --- a/core/log_io_remote.c +++ b/core/log_io_remote.c @@ -38,94 +38,147 @@ #include <log_io.h> #include "log_io_internal.h" +struct remote_state { + struct recovery_state *r; + int (*handler) (struct recovery_state *r, struct tbuf *row); +}; + static struct tbuf * -row_reader_v04(struct palloc_pool *pool) +row_reader_v11(struct palloc_pool *pool) { - const int header_size = offsetof(struct row_v04, data); + const int header_size = sizeof(struct row_v11); struct tbuf *m = tbuf_alloc(pool); + tbuf_ensure(m, header_size); if (fiber_read(m->data, header_size) != header_size) { say_error("unexpected eof reading row header"); return NULL; } - m->len = header_size; + tbuf_ensure(m, header_size + row_v11(m)->len); + m->len = header_size + row_v11(m)->len; - tbuf_ensure(m, header_size + row_v04(m)->len); - if (fiber_read(row_v04(m)->data, row_v04(m)->len) != row_v04(m)->len) { + if (fiber_read(row_v11(m)->data, row_v11(m)->len) != row_v11(m)->len) { say_error("unexpected eof reading row body"); return NULL; } - m->len += row_v04(m)->len; say_debug("read row bytes:%" PRIu32 " %s", m->len, tbuf_to_hex(m)); return m; } -static void -pull_from_remote(void *state) +static struct tbuf * +remote_read_row(i64 initial_lsn) { - struct recovery_state *r = state; struct tbuf *row; - i64 lsn; - int rows = 0; bool warning_said = false; const int reconnect_delay = 1; + const char *err = NULL; + u32 version; for (;;) { if (fiber->fd < 0) { if (fiber_connect(fiber->data) < 0) { - if (!warning_said) { - say_syserror("can't connect to feeder"); - say_info("will retry every %i second", reconnect_delay); - warning_said = true; - } - fiber_sleep(reconnect_delay); - continue; + err = "can't connect to feeder"; + goto err; } - say_crit("succefully connected to feeder"); - lsn = confirmed_lsn(r) + 1; - fiber_write(&lsn, sizeof(lsn)); + if (fiber_write(&initial_lsn, sizeof(initial_lsn)) != sizeof(initial_lsn)) { + err = "can't write version"; + goto err; + } + + if (fiber_read(&version, sizeof(version)) != sizeof(version)) { + err = "can't read version"; + goto err; + } - say_crit("starting remote recovery from lsn:%" PRIi64, lsn); + if (version != default_version) { + err = "remote version mismatch"; + goto err; + } + + say_crit("succefully connected to feeder"); + say_crit("starting remote recovery from lsn:%" PRIi64, initial_lsn); warning_said = false; + err = NULL; } - row = row_reader_v04(fiber->pool); + row = row_reader_v11(fiber->pool); if (row == NULL) { - fiber_close(); - fiber_sleep(reconnect_delay); - continue; + err = "can't read row"; + goto err; + } + + return row; + + err: + if (err != NULL && !warning_said) { + say_info("%s", err); + say_info("will retry every %i second", reconnect_delay); + warning_said = true; } + fiber_sleep(reconnect_delay); + } +} - if (r->wal_class.handler(r, row) < 0) - panic("replication failure: can't apply row"); +static void +pull_from_remote(void *state) +{ + struct remote_state *h = state; + struct tbuf *row; - if (wal_write(r, r->wal_class.row_lsn(row), row) == false) - panic("replication failure: can't write row to WAL"); + for (;;) { + row = remote_read_row(confirmed_lsn(h->r) + 1); + h->r->recovery_lag = ev_now() - row_v11(row)->tm; - next_lsn(r, r->wal_class.row_lsn(row)); - confirm_lsn(r, r->wal_class.row_lsn(row)); + if (h->handler(h->r, row) < 0) + continue; - if (rows++ % 1000 == 0) { - prelease(fiber->pool); - rows = 0; - } + prelease_after(fiber->pool, 128 * 1024); } } +int +default_remote_row_handler(struct recovery_state *r, struct tbuf *row) +{ + struct tbuf *data; + i64 lsn = row_v11(row)->lsn; + + /* save row data since wal_row_handler may clobber it */ + data = tbuf_alloc(row->pool); + tbuf_append(data, row_v11(row)->data, row_v11(row)->len); + + if (r->wal_row_handler(r, row) < 0) + panic("replication failure: can't apply row"); + + if (wal_write(r, lsn, data) == false) + panic("replication failure: can't write row to WAL"); + + next_lsn(r, lsn); + confirm_lsn(r, lsn); + + return 0; +} + struct fiber * -recover_follow_remote(struct recovery_state *r, char *ip_addr, int port) +recover_follow_remote(struct recovery_state *r, char *ip_addr, int port, + int (*handler) (struct recovery_state *r, struct tbuf *row)) { char *name; struct fiber *f; struct in_addr server; struct sockaddr_in *addr; + struct remote_state *h; say_crit("initializing remote hot standby, WAL feeder %s:%i", ip_addr, port); name = palloc(eter_pool, 64); snprintf(name, 64, "remote_hot_standby/%s:%i", ip_addr, port); - f = fiber_create(name, -1, -1, pull_from_remote, r); + + h = palloc(eter_pool, sizeof(*h)); + h->r = r; + h->handler = handler; + + f = fiber_create(name, -1, -1, pull_from_remote, h); if (f == NULL) return NULL; diff --git a/core/pickle.c b/core/pickle.c index 6e7b140fad6cee607c6bf9c6e0ed2460afe09c72..e0b770ad589f598c9fc32b24b6c2d2589e858a33 100644 --- a/core/pickle.c +++ b/core/pickle.c @@ -79,6 +79,20 @@ write_varint32(struct tbuf *b, u32 value) append_byte(b, (u8)((value) & 0x7F)); } +u16 +read_u16(struct tbuf *b) +{ + if (b->len < 2) + raise(ERR_CODE_UNKNOWN_ERROR, "buffer too short"); + + u16 r = *(u16 *)b->data; /* FIXME: endianess & aligment */ + b->size -= 2; + b->len -= 2; + b->data += 2; + + return r; +} + u32 read_u32(struct tbuf *b) { @@ -146,7 +160,7 @@ read_varint32(struct tbuf *buf) buf->size -= 4; buf->len -= 4; return (b[0] & 0x7f) << 21 | (b[1] & 0x7f) << 14 | - (b[2] & 0x7f) << 7 | (b[3] & 0x7f); + (b[2] & 0x7f) << 7 | (b[3] & 0x7f); } if (len < 5) @@ -156,7 +170,7 @@ read_varint32(struct tbuf *buf) buf->size -= 5; buf->len -= 5; return (b[0] & 0x7f) << 28 | (b[1] & 0x7f) << 21 | - (b[2] & 0x7f) << 14 | (b[3] & 0x7f) << 7 | (b[4] & 0x7f); + (b[2] & 0x7f) << 14 | (b[3] & 0x7f) << 7 | (b[4] & 0x7f); } raise(ERR_CODE_UNKNOWN_ERROR, "imposible happend"); diff --git a/core/stat.c b/core/stat.c index 5f0b45b0df6d7138d77b30401593084611221c35..cae1ff1ef70d925dfe421e3d5dfb1379d9d2e5b3 100644 --- a/core/stat.c +++ b/core/stat.c @@ -43,7 +43,6 @@ static int stats_size = 0; static int stats_max = 0; static int base = 0; - int stat_register(char **name, size_t count) { @@ -78,7 +77,6 @@ stat_collect(int base, int name, i64 value) stats[base + name].value[SECS] += value; } - void stat_print(struct tbuf *buf) { @@ -110,6 +108,9 @@ stat_print(struct tbuf *buf) void stat_age(ev_timer *timer, int events __unused__) { + if (stats == NULL) + return; + for (int i = 0; stats[i].name != NULL; i++) { for (int j = 0; j < SECS - 1; j++) stats[i].value[j + 1] = stats[i].value[j]; diff --git a/core/tarantool.c b/core/tarantool.c index 3d1023ae01ed249f398baf8ce8299e23fe3a61c6..ef5a5d2e07d975f85d589f05e4b2b0dad4692557 100644 --- a/core/tarantool.c +++ b/core/tarantool.c @@ -111,7 +111,7 @@ sig_int(int signal) { say_info("SIGINT or SIGTERM recieved, terminating"); - if (recovery_state != NULL) { + if (recovery_state !=NULL) { struct child *writer = wal_writer(recovery_state); if (writer && writer->out && writer->out->fd > 0) { close(writer->out->fd); @@ -417,6 +417,7 @@ main(int argc, char **argv) #if defined(UTILITY) initialize_minimal(); + signal_init(); mod_init(); #elif defined(STORAGE) ev_signal *ev_sig; diff --git a/include/log_io.h b/include/log_io.h index 88cdde7427123aafeb5fa96e140fb7ecb88d1132..8643acf57b1e11cc9714796694b039aa5a75ffbd 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -37,11 +37,13 @@ #define RECOVER_READONLY 1 +extern const u16 default_tag; +extern const u32 default_version; + struct log_io; struct recovery_state; -typedef int (*row_handler) (struct recovery_state *, const struct tbuf *); -typedef struct tbuf *(*row_reader) (FILE *f, struct palloc_pool * pool); - +typedef int (row_handler) (struct recovery_state *, struct tbuf *); +typedef struct tbuf *(row_reader) (FILE *f, struct palloc_pool *pool); struct row_v04 { i64 lsn; /* this used to be tid */ u16 type; @@ -49,12 +51,12 @@ struct row_v04 { u8 data[]; } __packed__; -struct row_v05 { +struct row_v11 { + u32 header_crc32c; i64 lsn; double tm; u32 len; u32 data_crc32c; - u32 header_crc32c; u8 data[]; } __packed__; @@ -63,11 +65,13 @@ static inline struct row_v04 *row_v04(const struct tbuf *t) return (struct row_v04 *)t->data; } -static inline struct row_v05 *row_v05(const struct tbuf *t) +static inline struct row_v11 *row_v11(const struct tbuf *t) { - return (struct row_v05 *)t->data; + return (struct row_v11 *)t->data; } +struct tbuf *convert_to_v11(struct tbuf *orig, i64 lsn); + struct recovery_state *recover_init(const char *snap_dirname, const char *xlog_dirname, row_reader snap_row_reader, row_handler snap_row_handler, row_handler xlog_row_handler, int rows_per_file, @@ -76,7 +80,7 @@ struct recovery_state *recover_init(const char *snap_dirname, const char *xlog_d int recover(struct recovery_state *, i64 lsn); void recover_follow(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay); void recover_finalize(struct recovery_state *r); -bool wal_write_v04(struct recovery_state *r, int op, const u8 *data, size_t len); +bool wal_write(struct recovery_state *r, i64 lsn, struct tbuf *data); /* recovery accessors */ struct palloc_pool *recovery_pool(struct recovery_state *r); @@ -88,7 +92,9 @@ struct child *wal_writer(struct recovery_state *r); int read_log(const char *filename, row_reader reader, row_handler xlog_handler, row_handler snap_handler, void *state); -struct fiber *recover_follow_remote(struct recovery_state *r, char *ip_addr, int port); +int default_remote_row_handler(struct recovery_state *r, struct tbuf *row); +struct fiber *recover_follow_remote(struct recovery_state *r, char *ip_addr, int port, + int (*handler) (struct recovery_state *r, struct tbuf *row)); struct log_io_iter; void snapshot_write_row(struct log_io_iter *i, struct tbuf *row); diff --git a/include/pickle.h b/include/pickle.h index 3f74c898b68b1474b999bdb3ad5dbba7f3cd795d..250ba240adeb1157b4babe14b41b7aa28adb5d4a 100644 --- a/include/pickle.h +++ b/include/pickle.h @@ -33,6 +33,7 @@ u8 *save_varint32(u8 *target, u32 value); void write_varint32(struct tbuf *b, u32 value); u8 read_u8(struct tbuf *b); +u16 read_u16(struct tbuf *b); u32 read_u32(struct tbuf *b); u32 read_varint32(struct tbuf *buf); void *read_field(struct tbuf *buf); @@ -62,12 +63,12 @@ inline static u32 load_varint32(void **data) if (!(b[3] & 0x80)) { *data += 4; return (b[0] & 0x7f) << 21 | (b[1] & 0x7f) << 14 | - (b[2] & 0x7f) << 7 | (b[3] & 0x7f); + (b[2] & 0x7f) << 7 | (b[3] & 0x7f); } if (!(b[4] & 0x80)) { *data += 5; return (b[0] & 0x7f) << 28 | (b[1] & 0x7f) << 21 | - (b[2] & 0x7f) << 14 | (b[3] & 0x7f) << 7 | (b[4] & 0x7f); + (b[2] & 0x7f) << 14 | (b[3] & 0x7f) << 7 | (b[4] & 0x7f); } assert(false); diff --git a/include/util.h b/include/util.h index 54b4eb58a557b28fd933d8b355495d57c5c1b86b..bc7e6f5370cf8a21793016ffcd03b56baf2096be 100644 --- a/include/util.h +++ b/include/util.h @@ -47,10 +47,12 @@ #define PRI_SZ "lu" #define PRI_SSZ "ld" #define PRI_OFFT "lu" +#define PRI_XFFT "lx" #else #define PRI_SZ "u" #define PRI_SSZ "d" #define PRI_OFFT "llu" +#define PRI_XFFT "llx" #endif #define nelem(x) (sizeof((x))/sizeof((x)[0])) diff --git a/mod/feeder/feeder.c b/mod/feeder/feeder.c index cf8e86bf0aec7188bcc6c0e90381259d80bf600d..04ff9f265a87c5f78ba71d3727eba01ebac7dda9 100644 --- a/mod/feeder/feeder.c +++ b/mod/feeder/feeder.c @@ -34,7 +34,7 @@ static char *custom_proc_title; static int -send_row(struct recovery_state *r __unused__, const struct tbuf *t) +send_row(struct recovery_state *r __unused__, struct tbuf *t) { u8 *data = t->data; ssize_t bytes, len = t->len; @@ -57,6 +57,7 @@ static void recover_feed_slave(int sock) { struct recovery_state *log_io; + struct tbuf *ver; i64 lsn; ssize_t r; @@ -74,6 +75,10 @@ recover_feed_slave(int sock) exit(EXIT_SUCCESS); } + ver = tbuf_alloc(fiber->pool); + tbuf_append(ver, &default_version, sizeof(default_version)); + send_row(NULL, ver); + log_io = recover_init(NULL, cfg.wal_feeder_dir, NULL, NULL, send_row, 0, 0, 0, 64, RECOVER_READONLY, false); diff --git a/mod/silverbox/box.c b/mod/silverbox/box.c index ba35ecf692149d631e20892903f2c115501332c1..2c601f687d90760c03ab216cdf2be855ce20d56a 100644 --- a/mod/silverbox/box.c +++ b/mod/silverbox/box.c @@ -1109,12 +1109,11 @@ op_is_select(u32 op) } u32 -box_dispach(struct box_txn *txn, enum box_mode mode, u32 op, struct tbuf *data) +box_dispach(struct box_txn *txn, enum box_mode mode, u16 op, struct tbuf *data) { u32 cardinality; int ret_code; - void *data__data = data->data; - u32 data__len = data->len; + struct tbuf req = { .data = data->data, .len = data->len }; int saved_iov_cnt = fiber->iov_cnt; ev_tstamp start = ev_now(), stop; @@ -1200,13 +1199,16 @@ box_dispach(struct box_txn *txn, enum box_mode mode, u32 op, struct tbuf *data) if (ret_code == -1) { if (!txn->in_recover) { struct tbuf *t = tbuf_alloc(fiber->pool); + tbuf_append(t, &default_tag, sizeof(default_tag)); tbuf_append(t, &op, sizeof(op)); - tbuf_append(t, data__data, data__len); + tbuf_append(t, req.data, req.len); - if (!wal_write_v04(recovery_state, op, data__data, data__len)) { + i64 lsn = next_lsn(recovery_state, 0); + if (!wal_write(recovery_state, lsn, t)) { ret_code = ERR_CODE_UNKNOWN_ERROR; goto abort; } + confirm_lsn(recovery_state, lsn); } txn_commit(txn); @@ -1321,14 +1323,24 @@ box_snap_reader(FILE *f, struct palloc_pool *pool) if (fread(box_snap_row(row)->data, box_snap_row(row)->data_size, 1, f) != 1) return NULL; - return row; + return convert_to_v11(row, 0); } static int -snap_apply(struct recovery_state *r __unused__, const struct tbuf *t) +snap_apply(struct recovery_state *r __unused__, struct tbuf *t) { - struct box_snap_row *row = box_snap_row(t); + struct box_snap_row *row; struct box_txn *txn = txn_alloc(0); + + /* drop wal header */ + if (tbuf_peek(t, sizeof(struct row_v11)) == NULL) + return -1; + + u16 tag = read_u16(t); + if (tag != 0) + return -1; + + row = box_snap_row(t); txn->in_recover = true; txn->n = row->namespace; @@ -1358,19 +1370,21 @@ snap_apply(struct recovery_state *r __unused__, const struct tbuf *t) } static int -xlog_apply(struct recovery_state *r __unused__, const struct tbuf *t) +xlog_apply(struct recovery_state *r __unused__, struct tbuf *t) { - struct row_v04 *row = row_v04(t); struct box_txn *txn = txn_alloc(0); txn->in_recover = true; - assert(row->lsn > confirmed_lsn(r)); + /* drop wal header */ + if (tbuf_peek(t, sizeof(struct row_v11)) == NULL) + return -1; - struct tbuf *b = palloc(fiber->pool, sizeof(*b)); - b->data = row->data; - b->len = row->len; + u16 tag = read_u16(t); + if (tag != 0) + return -1; - if (box_dispach(txn, RW, row->type, b) != 0) + u16 type = read_u16(t); + if (box_dispach(txn, RW, type, t) != 0) return -1; txn_cleanup(txn); @@ -1378,7 +1392,7 @@ xlog_apply(struct recovery_state *r __unused__, const struct tbuf *t) } static int -snap_print(struct recovery_state *r __unused__, const struct tbuf *t) +snap_print(struct recovery_state *r __unused__, struct tbuf *t) { struct tbuf *out = tbuf_alloc(t->pool); struct box_snap_row *row = box_snap_row(t); @@ -1389,7 +1403,7 @@ snap_print(struct recovery_state *r __unused__, const struct tbuf *t) } static int -xlog_print(struct recovery_state *r __unused__, const struct tbuf *t) +xlog_print(struct recovery_state *r __unused__, struct tbuf *t) { struct tbuf *out = tbuf_alloc(t->pool); int res = box_xlog_sprint(out, t); @@ -1587,7 +1601,7 @@ box_bound_to_primary(void *data __unused__) status = palloc(eter_pool, 64); snprintf(status, 64, "hot_standby/%s:%i%s", cfg.wal_feeder_ipaddr, cfg.wal_feeder_port, custom_proc_title); - recover_follow_remote(recovery_state, cfg.wal_feeder_ipaddr, cfg.wal_feeder_port); + recover_follow_remote(recovery_state, cfg.wal_feeder_ipaddr, cfg.wal_feeder_port, default_remote_row_handler); title("hot_standby/%s:%i", cfg.wal_feeder_ipaddr, cfg.wal_feeder_port); } else { @@ -1822,6 +1836,7 @@ mod_snapshot(struct log_io_iter *i) header.data_size = tuple->bsize; tbuf_reset(row); + tbuf_append(row, &default_tag, sizeof(default_tag)); tbuf_append(row, &header, sizeof(header)); tbuf_append(row, tuple->data, tuple->bsize); diff --git a/mod/silverbox/box.h b/mod/silverbox/box.h index 6085fc450b6e6b65adf47eb84b5ac8dbc598659b..7b5228cb3f06f1109d1b0b4b9b55d405d3716a27 100644 --- a/mod/silverbox/box.h +++ b/mod/silverbox/box.h @@ -179,7 +179,7 @@ ENUM(messages, MESSAGES); struct box_tuple *index_find(struct index *index, void *key); struct box_txn *txn_alloc(u32 flags); -u32 box_dispach(struct box_txn *txn, enum box_mode mode, u32 op, struct tbuf *data); +u32 box_dispach(struct box_txn *txn, enum box_mode mode, u16 op, struct tbuf *data); void tuple_txn_ref(struct box_txn *txn, struct box_tuple *tuple); void txn_cleanup(struct box_txn *txn);