From 2c0d418bca940d43f05630a7a197f640976370b0 Mon Sep 17 00:00:00 2001
From: Serge Petrenko <sergepetrenko@tarantool.org>
Date: Wed, 13 Mar 2019 13:38:07 +0300
Subject: [PATCH] evio: fix timeout calculations

The function evio_timeout_update() failed to update the starting time
point, which lead to timeouts happening much faster than they should if
there were consecutive calls to the function.
This lead, for example, to applier timing out while reading a several
megabyte-size row in 0.2 seconds even if replication_timeout was set to
15 seconds.

Closes #4042
---
 src/box/xrow_io.cc                         |   4 +-
 src/lib/core/coio.cc                       |  18 ++--
 src/lib/core/coio.h                        |   2 +-
 src/lib/core/evio.h                        |   5 +-
 test/replication/long_row_timeout.result   | 114 +++++++++++++++++++++
 test/replication/long_row_timeout.test.lua |  50 +++++++++
 test/replication/suite.cfg                 |   1 +
 7 files changed, 180 insertions(+), 14 deletions(-)
 create mode 100644 test/replication/long_row_timeout.result
 create mode 100644 test/replication/long_row_timeout.test.lua

diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 4b6e68bc1f..48707982bd 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -71,7 +71,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 	/* Read fixed header */
 	if (ibuf_used(in) < 1)
 		coio_breadn_timeout(coio, in, 1, delay);
-	coio_timeout_update(start, &delay);
+	coio_timeout_update(&start, &delay);
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
@@ -81,7 +81,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
 	if (to_read > 0)
 		coio_breadn_timeout(coio, in, to_read, delay);
-	coio_timeout_update(start, &delay);
+	coio_timeout_update(&start, &delay);
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index 6b94cf2a65..e88d724d5b 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -210,7 +210,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 		if (!uri->host_hint) freeaddrinfo(ai);
 		else free(ai_local.ai_addr);
 	});
-	evio_timeout_update(loop(), start, &delay);
+	evio_timeout_update(loop(), &start, &delay);
 
 	coio_timeout_init(&start, &delay, timeout);
 	assert(! evio_has_fd(coio));
@@ -232,7 +232,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 		}
 		ai = ai->ai_next;
 		ev_now_update(loop);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 
 	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
@@ -278,7 +278,7 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -338,7 +338,7 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -433,7 +433,7 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
 
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -499,7 +499,7 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
 
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 	return total;
 }
@@ -545,7 +545,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -590,7 +590,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 		fiber_testcancel();
 		if (is_timedout)
 			tnt_raise(TimedOut);
-		coio_timeout_update(start, &delay);
+		coio_timeout_update(&start, &delay);
 	}
 }
 
@@ -759,7 +759,7 @@ coio_write_fd_timeout(int fd, const void *data, size_t size, ev_tstamp timeout)
 		}
 		if (errno == EAGAIN || errno == EWOULDBLOCK) {
 			coio_wait(fd, COIO_WRITE, delay);
-			evio_timeout_update(loop, start, &delay);
+			evio_timeout_update(loop, &start, &delay);
 		} else if (errno != EINTR) {
 			diag_set(SocketError, sio_socketname(fd), "write");
 			return -1;
diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
index 104abf7206..6a23376897 100644
--- a/src/lib/core/coio.h
+++ b/src/lib/core/coio.h
@@ -88,7 +88,7 @@ coio_timeout_init(ev_tstamp *start, ev_tstamp *delay,
 }
 
 static inline void
-coio_timeout_update(ev_tstamp start, ev_tstamp *delay)
+coio_timeout_update(ev_tstamp *start, ev_tstamp *delay)
 {
 	return evio_timeout_update(loop(), start, delay);
 }
diff --git a/src/lib/core/evio.h b/src/lib/core/evio.h
index bd48235bca..c86be9e6a8 100644
--- a/src/lib/core/evio.h
+++ b/src/lib/core/evio.h
@@ -148,9 +148,10 @@ evio_timeout_init(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay,
 }
 
 static inline void
-evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay)
+evio_timeout_update(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay)
 {
-	ev_tstamp elapsed = ev_monotonic_now(loop) - start;
+	ev_tstamp elapsed = ev_monotonic_now(loop) - *start;
+	*start += elapsed;
 	*delay = (elapsed >= *delay) ? 0 : *delay - elapsed;
 }
 
diff --git a/test/replication/long_row_timeout.result b/test/replication/long_row_timeout.result
new file mode 100644
index 0000000000..5b5a46d510
--- /dev/null
+++ b/test/replication/long_row_timeout.result
@@ -0,0 +1,114 @@
+fiber = require('fiber')
+---
+...
+digest = require('digest')
+---
+...
+test_run = require('test_run').new()
+---
+...
+--
+-- gh-4042 applier read times out too fast when reading large rows.
+--
+box.schema.user.grant('guest', 'replication')
+---
+...
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+---
+- true
+...
+test_run:cmd('start server replica')
+---
+- true
+...
+box.info.replication[2].downstream.status
+---
+- follow
+...
+default_memtx_max_tuple_size = box.cfg.memtx_max_tuple_size
+---
+...
+test_run:cmd('switch replica')
+---
+- true
+...
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
+---
+...
+test_run:cmd('switch default')
+---
+- true
+...
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
+---
+...
+-- insert some big rows which cannot be read in one go, so applier yields
+-- on read a couple of times.
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end
+---
+...
+-- replication_disconnect_timeout is 4 * replication_timeout, check that
+-- replica doesn't time out too early.
+test_run:cmd('setopt delimiter ";"')
+---
+- true
+...
+ok = true;
+---
+...
+start = fiber.time();
+---
+...
+while fiber.time() - start < 3 * box.cfg.replication_timeout do
+    if box.info.replication[2].downstream.status ~= 'follow' then
+        ok = false
+        break
+    end
+    fiber.sleep(0.001)
+end;
+---
+...
+test_run:cmd('setopt delimiter ""');
+---
+- true
+...
+ok
+---
+- true
+...
+s:drop()
+---
+...
+test_run:cmd('stop server replica')
+---
+- true
+...
+test_run:cmd('cleanup server replica')
+---
+- true
+...
+test_run:cmd('delete server replica')
+---
+- true
+...
+test_run:cleanup_cluster()
+---
+...
+box.cfg{memtx_max_tuple_size = default_memtx_max_tuple_size}
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+-- Rotate xlogs so as not to replicate the huge rows in
+-- the following tests.
+box.snapshot()
+---
+- ok
+...
diff --git a/test/replication/long_row_timeout.test.lua b/test/replication/long_row_timeout.test.lua
new file mode 100644
index 0000000000..6e1d38b112
--- /dev/null
+++ b/test/replication/long_row_timeout.test.lua
@@ -0,0 +1,50 @@
+fiber = require('fiber')
+digest = require('digest')
+test_run = require('test_run').new()
+
+--
+-- gh-4042 applier read times out too fast when reading large rows.
+--
+box.schema.user.grant('guest', 'replication')
+test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"')
+test_run:cmd('start server replica')
+box.info.replication[2].downstream.status
+
+default_memtx_max_tuple_size = box.cfg.memtx_max_tuple_size
+test_run:cmd('switch replica')
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
+test_run:cmd('switch default')
+box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024}
+
+-- insert some big rows which cannot be read in one go, so applier yields
+-- on read a couple of times.
+s = box.schema.space.create('test')
+_ = s:create_index('pk')
+for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end
+-- replication_disconnect_timeout is 4 * replication_timeout, check that
+-- replica doesn't time out too early.
+test_run:cmd('setopt delimiter ";"')
+ok = true;
+start = fiber.time();
+while fiber.time() - start < 3 * box.cfg.replication_timeout do
+    if box.info.replication[2].downstream.status ~= 'follow' then
+        ok = false
+        break
+    end
+    fiber.sleep(0.001)
+end;
+test_run:cmd('setopt delimiter ""');
+
+ok
+
+s:drop()
+test_run:cmd('stop server replica')
+test_run:cmd('cleanup server replica')
+test_run:cmd('delete server replica')
+test_run:cleanup_cluster()
+box.cfg{memtx_max_tuple_size = default_memtx_max_tuple_size}
+box.schema.user.revoke('guest', 'replication')
+
+-- Rotate xlogs so as not to replicate the huge rows in
+-- the following tests.
+box.snapshot()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 5e88097311..91e884ece6 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -9,6 +9,7 @@
     "wal_rw_stress.test.lua": {},
     "force_recovery.test.lua": {},
     "on_schema_init.test.lua": {},
+    "long_row_timeout.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
GitLab