From a48b9108362e34513a911ef7594d641944a06118 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Fri, 5 Oct 2012 17:12:36 +0400
Subject: [PATCH] Refactoring: make sure row applier is called with context.

When we recover either from a remote source or
from local log file/snapshot, a module "apply row" function
is invoked. Up until now this function
was invoked with a single argument - the row to apply,
and there were no context which would be passed around.

This worked fine as long as fibers were "all inclusive" --
i.e. contained all possible context which a function may need.

Now, when networking API and IO handles are split from
struct fiber, they need to be passed around explicitly,
in particular, into row apply function.

Prepeare the code base for this.
---
 include/recovery.h |   8 ++--
 mod/box/box.m      |  11 +++--
 src/recovery.m     |  15 +++---
 src/replica.m      |   2 +-
 src/replication.m  | 112 +++++++++++++++++++++------------------------
 5 files changed, 74 insertions(+), 74 deletions(-)

diff --git a/include/recovery.h b/include/recovery.h
index 27e9e0af8e..aaffab17f5 100644
--- a/include/recovery.h
+++ b/include/recovery.h
@@ -39,7 +39,7 @@ struct tbuf;
 
 #define RECOVER_READONLY 1
 
-typedef int (row_handler)(struct tbuf *);
+typedef int (row_handler)(void *, struct tbuf *);
 
 /** A "condition variable" that allows fibers to wait when a given
  * LSN makes it to disk.
@@ -93,6 +93,7 @@ struct recovery_state {
 	 * formats.
 	 */
 	row_handler *row_handler;
+	void *row_handler_param;
 	int snap_io_rate_limit;
 	int rows_per_wal;
 	int flags;
@@ -106,7 +107,7 @@ struct recovery_state {
 extern struct recovery_state *recovery_state;
 
 void recovery_init(const char *snap_dirname, const char *xlog_dirname,
-		   row_handler row_handler,
+		   row_handler row_handler, void *row_handler_param,
 		   int rows_per_wal, const char *wal_mode,
 		   double wal_fsync_delay,
 		   int flags);
@@ -131,7 +132,8 @@ void set_lsn(struct recovery_state *r, int64_t lsn);
 void recovery_wait_lsn(struct recovery_state *r, int64_t lsn);
 
 int read_log(const char *filename,
-	     row_handler xlog_handler, row_handler snap_handler);
+	     row_handler xlog_handler, row_handler snap_handler,
+	     void *param);
 
 void recovery_follow_remote(struct recovery_state *r, const char *addr);
 void recovery_stop_remote(struct recovery_state *r);
diff --git a/mod/box/box.m b/mod/box/box.m
index 7fb633aa71..55bd73856e 100644
--- a/mod/box/box.m
+++ b/mod/box/box.m
@@ -229,7 +229,7 @@ box_xlog_sprint(struct tbuf *buf, const struct tbuf *t)
 }
 
 static int
-snap_print(struct tbuf *t)
+snap_print(void *param __attribute__((unused)), struct tbuf *t)
 {
 	@try {
 		struct tbuf *out = tbuf_alloc(t->pool);
@@ -253,7 +253,7 @@ snap_print(struct tbuf *t)
 }
 
 static int
-xlog_print(struct tbuf *t)
+xlog_print(void *param __attribute__((unused)), struct tbuf *t)
 {
 	@try {
 		struct tbuf *out = tbuf_alloc(t->pool);
@@ -283,7 +283,7 @@ recover_snap_row(struct tbuf *t)
 }
 
 static int
-recover_row(struct tbuf *t)
+recover_row(void *param __attribute__((unused)), struct tbuf *t)
 {
 	/* drop wal header */
 	if (tbuf_peek(t, sizeof(struct header_v11)) == NULL) {
@@ -483,7 +483,8 @@ mod_init(void)
 
 	/* recovery initialization */
 	recovery_init(cfg.snap_dir, cfg.wal_dir,
-		      recover_row, cfg.rows_per_wal, cfg.wal_mode,
+		      recover_row, NULL,
+		      cfg.rows_per_wal, cfg.wal_mode,
 		      cfg.wal_fsync_delay,
 		      init_storage ? RECOVER_READONLY : 0);
 	recovery_update_io_rate_limit(recovery_state, cfg.snap_io_rate_limit);
@@ -538,7 +539,7 @@ mod_init(void)
 int
 mod_cat(const char *filename)
 {
-	return read_log(filename, xlog_print, snap_print);
+	return read_log(filename, xlog_print, snap_print, NULL);
 }
 
 static void
diff --git a/src/recovery.m b/src/recovery.m
index 107b866ac9..5d1cb4607c 100644
--- a/src/recovery.m
+++ b/src/recovery.m
@@ -188,8 +188,9 @@ recovery_stop_local(struct recovery_state *r);
 
 void
 recovery_init(const char *snap_dirname, const char *wal_dirname,
-	      row_handler row_handler, int rows_per_wal,
-	      const char *wal_mode, double wal_fsync_delay, int flags)
+	      row_handler row_handler, void *row_handler_param,
+	      int rows_per_wal, const char *wal_mode,
+	      double wal_fsync_delay, int flags)
 {
 	assert(recovery_state == NULL);
 	recovery_state = p0alloc(eter_pool, sizeof(struct recovery_state));
@@ -200,6 +201,7 @@ recovery_init(const char *snap_dirname, const char *wal_dirname,
 		panic("unacceptable value of 'rows_per_wal'");
 
 	r->row_handler = row_handler;
+	r->row_handler_param = row_handler_param;
 
 	r->snap_dir = &snap_dir;
 	r->snap_dir->dirname = strdup(snap_dirname);
@@ -298,7 +300,7 @@ recover_snap(struct recovery_state *r)
 
 	struct tbuf *row;
 	while ((row = log_io_cursor_next(&i))) {
-		if (r->row_handler(row) < 0) {
+		if (r->row_handler(r->row_handler_param, row) < 0) {
 			say_error("can't apply row");
 			if (snap->dir->panic_if_error)
 				break;
@@ -347,7 +349,7 @@ recover_wal(struct recovery_state *r, struct log_io *l)
 		 * After handler(row) returned, row may be
 		 * modified, do not use it.
 		 */
-		if (r->row_handler(row) < 0) {
+		if (r->row_handler(r->row_handler_param, row) < 0) {
 			say_error("can't apply row");
 			if (l->dir->panic_if_error)
 				goto end;
@@ -1221,7 +1223,8 @@ snapshot_save(struct recovery_state *r,
 
 int
 read_log(const char *filename,
-	 row_handler *xlog_handler, row_handler *snap_handler)
+	 row_handler *xlog_handler, row_handler *snap_handler,
+	 void *param)
 {
 	struct log_dir *dir;
 	row_handler *h;
@@ -1244,7 +1247,7 @@ read_log(const char *filename,
 	log_io_cursor_open(&i, l);
 	struct tbuf *row;
 	while ((row = log_io_cursor_next(&i)))
-		h(row);
+		h(param, row);
 
 	log_io_cursor_close(&i);
 	log_io_close(&l);
diff --git a/src/replica.m b/src/replica.m
index 795f0f9470..121403d0b1 100644
--- a/src/replica.m
+++ b/src/replica.m
@@ -152,7 +152,7 @@ remote_apply_row(struct recovery_state *r, struct tbuf *row)
 	data = tbuf_alloc(row->pool);
 	tbuf_append(data, row->data + sizeof(struct header_v11), header_v11(row)->len);
 
-	if (r->row_handler(row) < 0)
+	if (r->row_handler(r->row_handler_param, row) < 0)
 		panic("replication failure: can't apply row");
 
 	tag = read_u16(data);
diff --git a/src/replication.m b/src/replication.m
index c086ccb9b7..8884b29bb0 100644
--- a/src/replication.m
+++ b/src/replication.m
@@ -135,18 +135,6 @@ spawner_shutdown_children();
 static void
 replication_relay_loop(int client_sock);
 
-/** A libev callback invoked when a relay client socket is ready
- * for read. This currently only happens when the client closes
- * its socket, and we get an EOF.
- */
-static void
-replication_relay_recv(struct ev_io *w, int revents);
-
-/** Send a single row to the client. */
-static int
-replication_relay_send_row(struct tbuf *t);
-
-
 /*
  * ------------------------------------------------------------------------
  * replication module
@@ -577,6 +565,56 @@ retry:
 	}
 }
 
+/** A libev callback invoked when a relay client socket is ready
+ * for read. This currently only happens when the client closes
+ * its socket, and we get an EOF.
+ */
+static void
+replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents)
+{
+	int fd = *((int *)w->data);
+	u8 data;
+
+	int result = recv(fd, &data, sizeof(data), 0);
+
+	if (result == 0 || (result < 0 && errno == ECONNRESET)) {
+		say_info("the client has closed its replication socket, exiting");
+		exit(EXIT_SUCCESS);
+	}
+	if (result < 0)
+		say_syserror("recv");
+
+	exit(EXIT_FAILURE);
+}
+
+
+/** Send a single row to the client. */
+static int
+replication_relay_send_row(void *param __attribute__((unused)), struct tbuf *t)
+{
+	u8 *data = t->data;
+	ssize_t bytes, len = t->size;
+	while (len > 0) {
+		bytes = write(fiber->fd, data, len);
+		if (bytes < 0) {
+			if (errno == EPIPE) {
+				/* socket closed on opposite site */
+				goto shutdown_handler;
+			}
+			panic_syserror("write");
+		}
+		len -= bytes;
+		data += bytes;
+	}
+
+	say_debug("send row: %" PRIu32 " bytes %s", t->size, tbuf_to_hex(t));
+	return 0;
+shutdown_handler:
+	say_info("the client has closed its replication socket, exiting");
+	exit(EXIT_SUCCESS);
+}
+
+
 /** The main loop of replication client service process. */
 static void
 replication_relay_loop(int client_sock)
@@ -624,7 +662,7 @@ replication_relay_loop(int client_sock)
 
 	ver = tbuf_alloc(fiber->gc_pool);
 	tbuf_append(ver, &default_version, sizeof(default_version));
-	replication_relay_send_row(ver);
+	replication_relay_send_row(NULL, ver);
 
 	/* init libev events handlers */
 	ev_default_loop(0);
@@ -637,7 +675,8 @@ replication_relay_loop(int client_sock)
 	ev_io_start(&sock_read_ev);
 
 	/* Initialize the recovery process */
-	recovery_init(cfg.snap_dir, cfg.wal_dir, replication_relay_send_row,
+	recovery_init(cfg.snap_dir, cfg.wal_dir,
+		      replication_relay_send_row, NULL,
 		      INT32_MAX, "fsync_delay", 0,
 		      RECOVER_READONLY);
 	/*
@@ -657,48 +696,3 @@ replication_relay_loop(int client_sock)
 	exit(EXIT_SUCCESS);
 }
 
-/** Receive data event to replication socket handler */
-static void
-replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents)
-{
-	int fd = *((int *)w->data);
-	u8 data;
-
-	int result = recv(fd, &data, sizeof(data), 0);
-
-	if (result == 0 || (result < 0 && errno == ECONNRESET)) {
-		say_info("the client has closed its replication socket, exiting");
-		exit(EXIT_SUCCESS);
-	}
-	if (result < 0)
-		say_syserror("recv");
-
-	exit(EXIT_FAILURE);
-}
-
-/** Send to row to client. */
-static int
-replication_relay_send_row(struct tbuf *t)
-{
-	u8 *data = t->data;
-	ssize_t bytes, len = t->size;
-	while (len > 0) {
-		bytes = write(fiber->fd, data, len);
-		if (bytes < 0) {
-			if (errno == EPIPE) {
-				/* socket closed on opposite site */
-				goto shutdown_handler;
-			}
-			panic_syserror("write");
-		}
-		len -= bytes;
-		data += bytes;
-	}
-
-	say_debug("send row: %" PRIu32 " bytes %s", t->size, tbuf_to_hex(t));
-	return 0;
-shutdown_handler:
-	say_info("the client has closed its replication socket, exiting");
-	exit(EXIT_SUCCESS);
-}
-
-- 
GitLab