diff --git a/include/exception.h b/include/exception.h index 49e8e8591471c205927235d5cf5722560d5e880d..dc4f63a3e76dd62aac831856f15a8412adfeed6f 100644 --- a/include/exception.h +++ b/include/exception.h @@ -48,6 +48,23 @@ @end +/** Internal error resulting from a failed system call. + */ +@interface SystemError: tnt_Exception { + @public + /* errno code */ + int errnum; + /* error description */ + char errmsg[TNT_ERRMSG_MAX]; +} + +- (id) init: (const char *)msg, ...; +- (id) init: (int)errnum_arg: (const char *)format, ...; +- (id) init: (int)errnum_arg: (const char *)format: (va_list)ap; +- (void) log; +@end + + /** Errors that should make it to the client. */ @interface ClientError: tnt_Exception { diff --git a/include/log_io.h b/include/log_io.h index 88929439c6e7136f765d2eb3502541b2c9d73cfa..2bbd9bd0a0c3acdbbe729f14a9c34f3ecd00f2c4 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -45,7 +45,7 @@ extern const u64 default_cookie; extern const u32 default_version; struct recovery_state; -typedef int (row_handler) (struct recovery_state *, struct tbuf *); +typedef int (row_handler)(struct tbuf *); typedef struct tbuf *(row_reader) (FILE *f, struct palloc_pool *pool); enum log_mode { @@ -193,7 +193,7 @@ int64_t next_lsn(struct recovery_state *r, i64 new_lsn); void recovery_wait_lsn(struct recovery_state *r, i64 lsn); int read_log(const char *filename, - row_handler xlog_handler, row_handler snap_handler, void *state); + row_handler xlog_handler, row_handler snap_handler); void recovery_follow_remote(struct recovery_state *r, const char *remote); void recovery_stop_remote(struct recovery_state *r); diff --git a/mod/box/box.m b/mod/box/box.m index a2851f22d5750c316e953253c210728d152e5ea5..3939ca15df3910fec07e40dcc360f8b6967413e5 100644 --- a/mod/box/box.m +++ b/mod/box/box.m @@ -214,7 +214,7 @@ box_xlog_sprint(struct tbuf *buf, const struct tbuf *t) static int -snap_print(struct recovery_state *r __attribute__((unused)), struct tbuf *t) +snap_print(struct tbuf *t) { struct tbuf *out = tbuf_alloc(t->pool); struct box_snap_row *row; @@ -235,7 +235,7 @@ snap_print(struct recovery_state *r __attribute__((unused)), struct tbuf *t) } static int -xlog_print(struct recovery_state *r __attribute__((unused)), struct tbuf *t) +xlog_print(struct tbuf *t) { struct tbuf *out = tbuf_alloc(t->pool); int res = box_xlog_sprint(out, t); @@ -262,7 +262,7 @@ convert_snap_row_to_wal(struct tbuf *t) } static int -recover_row(struct recovery_state *r __attribute__((unused)), struct tbuf *t) +recover_row(struct tbuf *t) { /* drop wal header */ if (tbuf_peek(t, sizeof(struct row_v11)) == NULL) @@ -519,7 +519,7 @@ mod_init(void) int mod_cat(const char *filename) { - return read_log(filename, xlog_print, snap_print, NULL); + return read_log(filename, xlog_print, snap_print); } static void diff --git a/src/exception.m b/src/exception.m index 420637094c014407aef7a09efabdb5bb007aa53c..88cb63d2ae967df5a3518c39173b414ceea82af4 100644 --- a/src/exception.m +++ b/src/exception.m @@ -46,6 +46,42 @@ @end +@implementation SystemError + +- (id) init: (const char *) format, ... +{ + va_list ap; + va_start(ap, format); + [self init: errno :format :ap]; + va_end(ap); + return self; +} + +- (id) init: (int)errnum_arg: (const char *)format, ... +{ + va_list ap; + va_start(ap, format); + [self init: errnum_arg :format: ap]; + va_end(ap); + return self; +} + +- (id) init: (int)errnum_arg :(const char *)format: (va_list)ap +{ + self = [super init]; + errnum = errnum_arg; + vsnprintf(errmsg, sizeof(errmsg), format, ap); + return self; +} + +- (void) log +{ + say(S_ERROR, strerror(errnum), "%s", errmsg); +} + +@end + + @implementation ClientError - (id) init: (uint32_t)errcode_, ... { diff --git a/src/log_io.m b/src/log_io.m index 83f27d45f9274886a754803360e24a865da2a44e..f11229a3649f0a9cc958de580985a8b78aaa24f0 100644 --- a/src/log_io.m +++ b/src/log_io.m @@ -123,7 +123,7 @@ static const int HEADER_SIZE_MAX = sizeof(v11) + sizeof(snap_mark) + 2; struct recovery_state *recovery_state; -enum suffix { NONE, INPROGRESS, ANY }; +enum suffix { NONE, INPROGRESS }; #define ROW_EOF (void *)1 @@ -251,7 +251,6 @@ v11_class(struct log_io_class *c) c->filename_ext = xlog_ext; c->filetype = xlog_mark; c->version = v11; - c->reader = row_reader_v11; c->marker = marker_v11; c->marker_size = sizeof(marker_v11); c->eof_marker = eof_marker_v11; @@ -363,7 +362,7 @@ read_rows(struct log_io_iter *i) marker_offset - good_offset, good_offset); say_debug("magic found at 0x%08" PRI_XFFT, marker_offset); - row = l->class->reader(l->f, fiber->gc_pool); + row = row_reader_v11(l->f, fiber->gc_pool); if (row == ROW_EOF) goto eof; @@ -869,7 +868,7 @@ error: */ int read_log(const char *filename, - row_handler *xlog_handler, row_handler *snap_handler, void *state) + row_handler *xlog_handler, row_handler *snap_handler) { struct log_io_iter i; struct log_io *l; @@ -892,7 +891,7 @@ read_log(const char *filename, l = log_io_open(c, LOG_READ, filename, NONE, f); iter_open(l, &i, read_rows); while ((row = iter_inner(&i, (void *)1))) - h(state, row); + h(row); if (i.error != 0) say_error("binary log `%s' wasn't correctly closed", filename); @@ -936,7 +935,7 @@ recover_snap(struct recovery_state *r) say_info("recover from `%s'", snap->filename); while ((row = iter_inner(&i, (void *)1))) { - if (r->row_handler(r, row) < 0) { + if (r->row_handler(row) < 0) { say_error("can't apply row"); return -1; } @@ -987,21 +986,19 @@ recover_wal(struct recovery_state *r, struct log_io *l) while ((row = iter_inner(&i, (void *)1))) { i64 lsn = row_v11(row)->lsn; - if (r && lsn <= r->confirmed_lsn) { + if (lsn <= r->confirmed_lsn) { say_debug("skipping too young row"); continue; } /* after handler(r, row) returned, row may be modified, do not use it */ - if (r->row_handler(r, row) < 0) { + if (r->row_handler(row) < 0) { say_error("can't apply row"); return -1; } - if (r) { - next_lsn(r, lsn); - confirm_lsn(r, lsn); - } + next_lsn(r, lsn); + confirm_lsn(r, lsn); } if (i.error != 0) { @@ -1587,14 +1584,20 @@ write_to_disk(struct recovery_state *r, struct wal_write_request *req) /* Flush stdio buffer to keep replication in sync. */ if (is_bulk_end && fflush(wal->f) < 0) { - say_syserror("can't flush WAL"); + say_syserror("%s: fflush() failed", wal->filename); goto fail; } if (r->wal_fsync_delay > 0 && ev_now() - last_flush >= r->wal_fsync_delay) { if (log_io_flush(wal) < 0) { - say_syserror("can't flush WAL"); + say_syserror("%s: fsync() failed", wal->filename); + /* + * XXX: we don't really know how many + * records were not written to disk: + * probably way more than the current + * one. + */ goto fail; } last_flush = ev_now(); diff --git a/src/log_io_remote.m b/src/log_io_remote.m index d099afb6a1f7ba41870813800cf243678780587f..436ca7595e5a3555c53eb491135d24e9403867ef 100644 --- a/src/log_io_remote.m +++ b/src/log_io_remote.m @@ -155,7 +155,7 @@ remote_apply_row(struct recovery_state *r, struct tbuf *row) data = tbuf_alloc(row->pool); tbuf_append(data, row_v11(row)->data, row_v11(row)->len); - if (r->row_handler(r, row) < 0) + if (r->row_handler(row) < 0) panic("replication failure: can't apply row"); tag = read_u16(data); diff --git a/src/replication.m b/src/replication.m index e54c151b178723033968eab432c6f7ed6865069d..60518034c58fd43d5343b706362df8261af5841b 100644 --- a/src/replication.m +++ b/src/replication.m @@ -136,7 +136,7 @@ replication_relay_recv(struct ev_io *w, int revents); /** Send a single row to the client. */ static int -replication_relay_send_row(struct recovery_state *r __attribute__((unused)), struct tbuf *t); +replication_relay_send_row(struct tbuf *t); /* @@ -601,7 +601,7 @@ replication_relay_loop(int client_sock) ver = tbuf_alloc(fiber->gc_pool); tbuf_append(ver, &default_version, sizeof(default_version)); - replication_relay_send_row(NULL, ver); + replication_relay_send_row(ver); /* init libev events handlers */ ev_default_loop(0); @@ -648,7 +648,7 @@ replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents) /** Send to row to client. */ static int -replication_relay_send_row(struct recovery_state *r __attribute__((unused)), struct tbuf *t) +replication_relay_send_row(struct tbuf *t) { u8 *data = t->data; ssize_t bytes, len = t->size;