diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc index 4b6e68bc1f3e6dd0a1353b92c71d225c7c2162f4..48707982bd2120db59c3ab1ce89e167074c02a29 100644 --- a/src/box/xrow_io.cc +++ b/src/box/xrow_io.cc @@ -71,7 +71,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, /* Read fixed header */ if (ibuf_used(in) < 1) coio_breadn_timeout(coio, in, 1, delay); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); /* Read length */ if (mp_typeof(*in->rpos) != MP_UINT) { @@ -81,7 +81,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, ssize_t to_read = mp_check_uint(in->rpos, in->wpos); if (to_read > 0) coio_breadn_timeout(coio, in, to_read, delay); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); uint32_t len = mp_decode_uint((const char **) &in->rpos); diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc index 6b94cf2a654ba11d040e28b9e8e98f475faf1c28..e88d724d5b74c56ea901e63986d1477c61d478a1 100644 --- a/src/lib/core/coio.cc +++ b/src/lib/core/coio.cc @@ -210,7 +210,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, if (!uri->host_hint) freeaddrinfo(ai); else free(ai_local.ai_addr); }); - evio_timeout_update(loop(), start, &delay); + evio_timeout_update(loop(), &start, &delay); coio_timeout_init(&start, &delay, timeout); assert(! evio_has_fd(coio)); @@ -232,7 +232,7 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, } ai = ai->ai_next; ev_now_update(loop); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); } tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed"); @@ -278,7 +278,7 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr, fiber_testcancel(); if (is_timedout) tnt_raise(TimedOut); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); } } @@ -338,7 +338,7 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, fiber_testcancel(); if (is_timedout) tnt_raise(TimedOut); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); } } @@ -433,7 +433,7 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz, if (is_timedout) tnt_raise(TimedOut); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); } } @@ -499,7 +499,7 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt, if (is_timedout) tnt_raise(TimedOut); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); } return total; } @@ -545,7 +545,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags, fiber_testcancel(); if (is_timedout) tnt_raise(TimedOut); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); } } @@ -590,7 +590,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags, fiber_testcancel(); if (is_timedout) tnt_raise(TimedOut); - coio_timeout_update(start, &delay); + coio_timeout_update(&start, &delay); } } @@ -759,7 +759,7 @@ coio_write_fd_timeout(int fd, const void *data, size_t size, ev_tstamp timeout) } if (errno == EAGAIN || errno == EWOULDBLOCK) { coio_wait(fd, COIO_WRITE, delay); - evio_timeout_update(loop, start, &delay); + evio_timeout_update(loop, &start, &delay); } else if (errno != EINTR) { diag_set(SocketError, sio_socketname(fd), "write"); return -1; diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h index 104abf720640037196ec06521cc4ea7d2fb54c7a..6a23376897279b5eb47d05dbce407479a1a48e8e 100644 --- a/src/lib/core/coio.h +++ b/src/lib/core/coio.h @@ -88,7 +88,7 @@ coio_timeout_init(ev_tstamp *start, ev_tstamp *delay, } static inline void -coio_timeout_update(ev_tstamp start, ev_tstamp *delay) +coio_timeout_update(ev_tstamp *start, ev_tstamp *delay) { return evio_timeout_update(loop(), start, delay); } diff --git a/src/lib/core/evio.h b/src/lib/core/evio.h index bd48235bca36ca0a1243c9e892d830429fe04b19..c86be9e6a8454804fad53cdd7b150b43ff7c87d5 100644 --- a/src/lib/core/evio.h +++ b/src/lib/core/evio.h @@ -148,9 +148,10 @@ evio_timeout_init(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay, } static inline void -evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay) +evio_timeout_update(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay) { - ev_tstamp elapsed = ev_monotonic_now(loop) - start; + ev_tstamp elapsed = ev_monotonic_now(loop) - *start; + *start += elapsed; *delay = (elapsed >= *delay) ? 0 : *delay - elapsed; } diff --git a/test/replication/long_row_timeout.result b/test/replication/long_row_timeout.result new file mode 100644 index 0000000000000000000000000000000000000000..5b5a46d5102e7ab9679ca7074298e215292e1a0c --- /dev/null +++ b/test/replication/long_row_timeout.result @@ -0,0 +1,114 @@ +fiber = require('fiber') +--- +... +digest = require('digest') +--- +... +test_run = require('test_run').new() +--- +... +-- +-- gh-4042 applier read times out too fast when reading large rows. +-- +box.schema.user.grant('guest', 'replication') +--- +... +test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"') +--- +- true +... +test_run:cmd('start server replica') +--- +- true +... +box.info.replication[2].downstream.status +--- +- follow +... +default_memtx_max_tuple_size = box.cfg.memtx_max_tuple_size +--- +... +test_run:cmd('switch replica') +--- +- true +... +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024} +--- +... +test_run:cmd('switch default') +--- +- true +... +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024} +--- +... +-- insert some big rows which cannot be read in one go, so applier yields +-- on read a couple of times. +s = box.schema.space.create('test') +--- +... +_ = s:create_index('pk') +--- +... +for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end +--- +... +-- replication_disconnect_timeout is 4 * replication_timeout, check that +-- replica doesn't time out too early. +test_run:cmd('setopt delimiter ";"') +--- +- true +... +ok = true; +--- +... +start = fiber.time(); +--- +... +while fiber.time() - start < 3 * box.cfg.replication_timeout do + if box.info.replication[2].downstream.status ~= 'follow' then + ok = false + break + end + fiber.sleep(0.001) +end; +--- +... +test_run:cmd('setopt delimiter ""'); +--- +- true +... +ok +--- +- true +... +s:drop() +--- +... +test_run:cmd('stop server replica') +--- +- true +... +test_run:cmd('cleanup server replica') +--- +- true +... +test_run:cmd('delete server replica') +--- +- true +... +test_run:cleanup_cluster() +--- +... +box.cfg{memtx_max_tuple_size = default_memtx_max_tuple_size} +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... +-- Rotate xlogs so as not to replicate the huge rows in +-- the following tests. +box.snapshot() +--- +- ok +... diff --git a/test/replication/long_row_timeout.test.lua b/test/replication/long_row_timeout.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..6e1d38b11294281a29dc9ad6fa11d228903f82de --- /dev/null +++ b/test/replication/long_row_timeout.test.lua @@ -0,0 +1,50 @@ +fiber = require('fiber') +digest = require('digest') +test_run = require('test_run').new() + +-- +-- gh-4042 applier read times out too fast when reading large rows. +-- +box.schema.user.grant('guest', 'replication') +test_run:cmd('create server replica with rpl_master=default, script="replication/replica.lua"') +test_run:cmd('start server replica') +box.info.replication[2].downstream.status + +default_memtx_max_tuple_size = box.cfg.memtx_max_tuple_size +test_run:cmd('switch replica') +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024} +test_run:cmd('switch default') +box.cfg{memtx_max_tuple_size = 21 * 1024 * 1024} + +-- insert some big rows which cannot be read in one go, so applier yields +-- on read a couple of times. +s = box.schema.space.create('test') +_ = s:create_index('pk') +for i = 1,5 do box.space.test:replace{1, digest.urandom(20 * 1024 * 1024)} collectgarbage('collect') end +-- replication_disconnect_timeout is 4 * replication_timeout, check that +-- replica doesn't time out too early. +test_run:cmd('setopt delimiter ";"') +ok = true; +start = fiber.time(); +while fiber.time() - start < 3 * box.cfg.replication_timeout do + if box.info.replication[2].downstream.status ~= 'follow' then + ok = false + break + end + fiber.sleep(0.001) +end; +test_run:cmd('setopt delimiter ""'); + +ok + +s:drop() +test_run:cmd('stop server replica') +test_run:cmd('cleanup server replica') +test_run:cmd('delete server replica') +test_run:cleanup_cluster() +box.cfg{memtx_max_tuple_size = default_memtx_max_tuple_size} +box.schema.user.revoke('guest', 'replication') + +-- Rotate xlogs so as not to replicate the huge rows in +-- the following tests. +box.snapshot() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 5e880973111a35b27bf5c87804539f1c9fdeec5c..91e884ece659035708da9d9e7c53ce4561024bba 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -9,6 +9,7 @@ "wal_rw_stress.test.lua": {}, "force_recovery.test.lua": {}, "on_schema_init.test.lua": {}, + "long_row_timeout.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"}