diff --git a/include/recovery.h b/include/recovery.h index 27e9e0af8e80ce2e03952f679d786ca8ce2aafe1..aaffab17f5bed002eb3917936e3d8ff9d1cabdc7 100644 --- a/include/recovery.h +++ b/include/recovery.h @@ -39,7 +39,7 @@ struct tbuf; #define RECOVER_READONLY 1 -typedef int (row_handler)(struct tbuf *); +typedef int (row_handler)(void *, struct tbuf *); /** A "condition variable" that allows fibers to wait when a given * LSN makes it to disk. @@ -93,6 +93,7 @@ struct recovery_state { * formats. */ row_handler *row_handler; + void *row_handler_param; int snap_io_rate_limit; int rows_per_wal; int flags; @@ -106,7 +107,7 @@ struct recovery_state { extern struct recovery_state *recovery_state; void recovery_init(const char *snap_dirname, const char *xlog_dirname, - row_handler row_handler, + row_handler row_handler, void *row_handler_param, int rows_per_wal, const char *wal_mode, double wal_fsync_delay, int flags); @@ -131,7 +132,8 @@ void set_lsn(struct recovery_state *r, int64_t lsn); void recovery_wait_lsn(struct recovery_state *r, int64_t lsn); int read_log(const char *filename, - row_handler xlog_handler, row_handler snap_handler); + row_handler xlog_handler, row_handler snap_handler, + void *param); void recovery_follow_remote(struct recovery_state *r, const char *addr); void recovery_stop_remote(struct recovery_state *r); diff --git a/mod/box/box.m b/mod/box/box.m index 7fb633aa71747db0535edb274940327315652c56..55bd73856e9fde38119b89120ab130db1788492f 100644 --- a/mod/box/box.m +++ b/mod/box/box.m @@ -229,7 +229,7 @@ box_xlog_sprint(struct tbuf *buf, const struct tbuf *t) } static int -snap_print(struct tbuf *t) +snap_print(void *param __attribute__((unused)), struct tbuf *t) { @try { struct tbuf *out = tbuf_alloc(t->pool); @@ -253,7 +253,7 @@ snap_print(struct tbuf *t) } static int -xlog_print(struct tbuf *t) +xlog_print(void *param __attribute__((unused)), struct tbuf *t) { @try { struct tbuf *out = tbuf_alloc(t->pool); @@ -283,7 +283,7 @@ recover_snap_row(struct tbuf *t) } static int -recover_row(struct tbuf *t) +recover_row(void *param __attribute__((unused)), struct tbuf *t) { /* drop wal header */ if (tbuf_peek(t, sizeof(struct header_v11)) == NULL) { @@ -483,7 +483,8 @@ mod_init(void) /* recovery initialization */ recovery_init(cfg.snap_dir, cfg.wal_dir, - recover_row, cfg.rows_per_wal, cfg.wal_mode, + recover_row, NULL, + cfg.rows_per_wal, cfg.wal_mode, cfg.wal_fsync_delay, init_storage ? RECOVER_READONLY : 0); recovery_update_io_rate_limit(recovery_state, cfg.snap_io_rate_limit); @@ -538,7 +539,7 @@ mod_init(void) int mod_cat(const char *filename) { - return read_log(filename, xlog_print, snap_print); + return read_log(filename, xlog_print, snap_print, NULL); } static void diff --git a/src/recovery.m b/src/recovery.m index 107b866ac9a30ec0df90965433ac26ce299f9f07..5d1cb4607cbe595987ffc93c70f8c1a836018962 100644 --- a/src/recovery.m +++ b/src/recovery.m @@ -188,8 +188,9 @@ recovery_stop_local(struct recovery_state *r); void recovery_init(const char *snap_dirname, const char *wal_dirname, - row_handler row_handler, int rows_per_wal, - const char *wal_mode, double wal_fsync_delay, int flags) + row_handler row_handler, void *row_handler_param, + int rows_per_wal, const char *wal_mode, + double wal_fsync_delay, int flags) { assert(recovery_state == NULL); recovery_state = p0alloc(eter_pool, sizeof(struct recovery_state)); @@ -200,6 +201,7 @@ recovery_init(const char *snap_dirname, const char *wal_dirname, panic("unacceptable value of 'rows_per_wal'"); r->row_handler = row_handler; + r->row_handler_param = row_handler_param; r->snap_dir = &snap_dir; r->snap_dir->dirname = strdup(snap_dirname); @@ -298,7 +300,7 @@ recover_snap(struct recovery_state *r) struct tbuf *row; while ((row = log_io_cursor_next(&i))) { - if (r->row_handler(row) < 0) { + if (r->row_handler(r->row_handler_param, row) < 0) { say_error("can't apply row"); if (snap->dir->panic_if_error) break; @@ -347,7 +349,7 @@ recover_wal(struct recovery_state *r, struct log_io *l) * After handler(row) returned, row may be * modified, do not use it. */ - if (r->row_handler(row) < 0) { + if (r->row_handler(r->row_handler_param, row) < 0) { say_error("can't apply row"); if (l->dir->panic_if_error) goto end; @@ -1221,7 +1223,8 @@ snapshot_save(struct recovery_state *r, int read_log(const char *filename, - row_handler *xlog_handler, row_handler *snap_handler) + row_handler *xlog_handler, row_handler *snap_handler, + void *param) { struct log_dir *dir; row_handler *h; @@ -1244,7 +1247,7 @@ read_log(const char *filename, log_io_cursor_open(&i, l); struct tbuf *row; while ((row = log_io_cursor_next(&i))) - h(row); + h(param, row); log_io_cursor_close(&i); log_io_close(&l); diff --git a/src/replica.m b/src/replica.m index 795f0f9470abd2fee0023961ddf8bff3eda87981..121403d0b155a96f7c9d938b31c91a8d8be5eb8b 100644 --- a/src/replica.m +++ b/src/replica.m @@ -152,7 +152,7 @@ remote_apply_row(struct recovery_state *r, struct tbuf *row) data = tbuf_alloc(row->pool); tbuf_append(data, row->data + sizeof(struct header_v11), header_v11(row)->len); - if (r->row_handler(row) < 0) + if (r->row_handler(r->row_handler_param, row) < 0) panic("replication failure: can't apply row"); tag = read_u16(data); diff --git a/src/replication.m b/src/replication.m index c086ccb9b7128f89d1329d06e968878fbed0f90d..8884b29bb0068c8a6f30f40d1a470fe264bf29c1 100644 --- a/src/replication.m +++ b/src/replication.m @@ -135,18 +135,6 @@ spawner_shutdown_children(); static void replication_relay_loop(int client_sock); -/** A libev callback invoked when a relay client socket is ready - * for read. This currently only happens when the client closes - * its socket, and we get an EOF. - */ -static void -replication_relay_recv(struct ev_io *w, int revents); - -/** Send a single row to the client. */ -static int -replication_relay_send_row(struct tbuf *t); - - /* * ------------------------------------------------------------------------ * replication module @@ -577,6 +565,56 @@ retry: } } +/** A libev callback invoked when a relay client socket is ready + * for read. This currently only happens when the client closes + * its socket, and we get an EOF. + */ +static void +replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents) +{ + int fd = *((int *)w->data); + u8 data; + + int result = recv(fd, &data, sizeof(data), 0); + + if (result == 0 || (result < 0 && errno == ECONNRESET)) { + say_info("the client has closed its replication socket, exiting"); + exit(EXIT_SUCCESS); + } + if (result < 0) + say_syserror("recv"); + + exit(EXIT_FAILURE); +} + + +/** Send a single row to the client. */ +static int +replication_relay_send_row(void *param __attribute__((unused)), struct tbuf *t) +{ + u8 *data = t->data; + ssize_t bytes, len = t->size; + while (len > 0) { + bytes = write(fiber->fd, data, len); + if (bytes < 0) { + if (errno == EPIPE) { + /* socket closed on opposite site */ + goto shutdown_handler; + } + panic_syserror("write"); + } + len -= bytes; + data += bytes; + } + + say_debug("send row: %" PRIu32 " bytes %s", t->size, tbuf_to_hex(t)); + return 0; +shutdown_handler: + say_info("the client has closed its replication socket, exiting"); + exit(EXIT_SUCCESS); +} + + /** The main loop of replication client service process. */ static void replication_relay_loop(int client_sock) @@ -624,7 +662,7 @@ replication_relay_loop(int client_sock) ver = tbuf_alloc(fiber->gc_pool); tbuf_append(ver, &default_version, sizeof(default_version)); - replication_relay_send_row(ver); + replication_relay_send_row(NULL, ver); /* init libev events handlers */ ev_default_loop(0); @@ -637,7 +675,8 @@ replication_relay_loop(int client_sock) ev_io_start(&sock_read_ev); /* Initialize the recovery process */ - recovery_init(cfg.snap_dir, cfg.wal_dir, replication_relay_send_row, + recovery_init(cfg.snap_dir, cfg.wal_dir, + replication_relay_send_row, NULL, INT32_MAX, "fsync_delay", 0, RECOVER_READONLY); /* @@ -657,48 +696,3 @@ replication_relay_loop(int client_sock) exit(EXIT_SUCCESS); } -/** Receive data event to replication socket handler */ -static void -replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents) -{ - int fd = *((int *)w->data); - u8 data; - - int result = recv(fd, &data, sizeof(data), 0); - - if (result == 0 || (result < 0 && errno == ECONNRESET)) { - say_info("the client has closed its replication socket, exiting"); - exit(EXIT_SUCCESS); - } - if (result < 0) - say_syserror("recv"); - - exit(EXIT_FAILURE); -} - -/** Send to row to client. */ -static int -replication_relay_send_row(struct tbuf *t) -{ - u8 *data = t->data; - ssize_t bytes, len = t->size; - while (len > 0) { - bytes = write(fiber->fd, data, len); - if (bytes < 0) { - if (errno == EPIPE) { - /* socket closed on opposite site */ - goto shutdown_handler; - } - panic_syserror("write"); - } - len -= bytes; - data += bytes; - } - - say_debug("send row: %" PRIu32 " bytes %s", t->size, tbuf_to_hex(t)); - return 0; -shutdown_handler: - say_info("the client has closed its replication socket, exiting"); - exit(EXIT_SUCCESS); -} -