diff --git a/include/latch.h b/include/latch.h deleted file mode 100644 index 438bd41a22952f80b79614f5b9817a23a4dcd14e..0000000000000000000000000000000000000000 --- a/include/latch.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef TARANTOOL_LATCH_H_INCLUDED -#define TARANTOOL_LATCH_H_INCLUDED -/* - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * 1. Redistributions of source code must retain the above - * copyright notice, this list of conditions and the - * following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR - * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ -#include <stdbool.h> - -struct fiber; - -/* - * Internal implementation of a container for a mutex like object - * with similar interface. It's used boolean variable because of - * single threaded nature of tarantool. But it's rather simple to change - * this variable to a mutex object to maintain multi threaded approach. - */ -struct tnt_latch { - bool locked; - - struct fiber *owner; -}; - -/** - * Initialize the given latch. - * - * @param latch Latch to be initialized. - */ -void tnt_latch_create(struct tnt_latch *latch); -/** - * Destroy the given latch. - */ -void tnt_latch_destroy(struct tnt_latch *latch); -/** - * Set the latch to the locked state. If it's already locked - * returns -1 value immediately otherwise returns 0. - * - * @param latch Latch to be locked. - */ -int tnt_latch_trylock(struct tnt_latch *latch); -/** - * Unlock the locked latch. - * - * @param latch Latch to be unlocked. - */ -void tnt_latch_unlock(struct tnt_latch *latch); - - -#endif /* TARANTOOL_LATCH_H_INCLUDED */ diff --git a/include/mutex.h b/include/mutex.h new file mode 100644 index 0000000000000000000000000000000000000000..eca995334b22e1977fe26e512783fe196474e49a --- /dev/null +++ b/include/mutex.h @@ -0,0 +1,140 @@ +#ifndef TARANTOOL_MUTEX_H_INCLUDED +#define TARNATOOL_MUTEX_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <assert.h> +#include <rlist.h> + +/** Mutex of cooperative multitasking environment. */ + +struct mutex +{ + /** + * The queue of fibers waiting on a mutex. + * The first fiber owns the mutex. + */ + struct rlist queue; +}; + +/** + * Initialize the given mutex. + * + * @param m mutex to be initialized. + */ +static inline void +mutex_create(struct mutex *m) +{ + rlist_create(&m->queue); +} + +static inline void +mutex_destroy(struct mutex *m) +{ + while (!rlist_empty(&m->queue)) { + struct fiber *f = rlist_first_entry(&m->queue, + struct fiber, state); + rlist_del_entry(f, state); + } +} + +/** + * Lock a mutex. If the mutex is already locked by another fiber, + * waits for timeout. + * + * @param m mutex to be locked. + * + * @retval false success + * @retval true timeout + */ +static inline bool +mutex_lock_timeout(struct mutex *m, ev_tstamp timeout) +{ + rlist_add_tail_entry(&m->queue, fiber, state); + ev_tstamp start = timeout; + while (timeout > 0) { + struct fiber *f = rlist_first_entry(&m->queue, + struct fiber, state); + if (f == fiber) + break; + + fiber_yield_timeout(timeout); + timeout -= ev_now() - start; + if (timeout <= 0) { + rlist_del_entry(fiber, state); + errno = ETIMEDOUT; + return true; + } + } + return false; +} + +/** + * Lock a mutex (no timeout). Waits indefinitely until + * the current fiber can gain access to the mutex. + */ +static inline void +mutex_lock(struct mutex *m) +{ + (void) mutex_lock_timeout(m, TIMEOUT_INFINITY); +} + +/** + * Try to lock a mutex. Return immediately if the mutex is locked. + * @retval false success + * @retval true the mutex is locked. + */ +static inline bool +mutex_trylock(struct mutex *m) +{ + if (rlist_empty(&m->queue)) { + mutex_lock(m); + return false; + } + return true; +} + +/** + * Unlock a mutex. The fiber calling this function must + * own the mutex. + */ +static inline void +mutex_unlock(struct mutex *m) +{ + struct fiber *f; + f = rlist_first_entry(&m->queue, struct fiber, state); + assert(f == fiber); + rlist_del_entry(f, state); + if (!rlist_empty(&m->queue)) { + f = rlist_first_entry(&m->queue, struct fiber, state); + fiber_wakeup(f); + } +} + +#endif /* TARANTOOL_MUTEX_H_INCLUDED */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ae8b99a0d66b5d4371aa13df9b30e3431180dcf4..9a3b88639318eeabde5ddb4867f9f2aedc051b30 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -94,7 +94,6 @@ set (common_sources exception.m errcode.c errinj.m - latch.m fio.c crc32.c rope.c diff --git a/src/latch.m b/src/latch.m deleted file mode 100644 index ff08a1ac04d02cfffdb6ee49140ac9a8db479e12..0000000000000000000000000000000000000000 --- a/src/latch.m +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * 1. Redistributions of source code must retain the above - * copyright notice, this list of conditions and the - * following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR - * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ -#include "latch.h" -#include "fiber.h" - -void -tnt_latch_create(struct tnt_latch *latch) -{ - latch->locked = false; - latch->owner = NULL; -} - -void -tnt_latch_destroy(struct tnt_latch *latch) -{ - assert(latch->locked == false); - - latch->owner = NULL; -} - -int -tnt_latch_trylock(struct tnt_latch *latch) -{ - if (latch->locked) { - assert(latch->owner != fiber); - - return -1; - } - - assert(latch->owner == NULL); - - latch->locked = true; - latch->owner = fiber; - - return 0; -} - -void -tnt_latch_unlock(struct tnt_latch *latch) -{ - assert(latch->owner == fiber); - - latch->locked = false; - latch->owner = NULL; -} diff --git a/src/lua/lua_socket.m b/src/lua/lua_socket.m index e49e7b6397f1c3bdfdfcfb5fe0e31e87b2834728..c341be5ed9095839e73e89e0ee806e6e2de53854 100644 --- a/src/lua/lua_socket.m +++ b/src/lua/lua_socket.m @@ -47,6 +47,7 @@ #include "tbuf.h" #include <lua/init.h> #include <stdlib.h> +#include <mutex.h> static const char socketlib_name[] = "box.socket"; @@ -66,7 +67,10 @@ enum bio_status { }; struct bio_socket { - struct ev_io coio; + struct ev_io io_r; + struct ev_io io_w; + struct mutex io_r_mutex; + struct mutex io_w_mutex; struct iobuf *iob; /** SOCK_DGRAM or SOCK_STREAM */ int socktype; @@ -79,10 +83,13 @@ bio_pushsocket(struct lua_State *L, int socktype) struct bio_socket *s = lua_newuserdata(L, sizeof(struct bio_socket)); luaL_getmetatable(L, socketlib_name); lua_setmetatable(L, -2); - coio_init(&s->coio); + coio_init(&s->io_r); + coio_init(&s->io_w); s->socktype = socktype; s->iob = NULL; s->error = 0; + mutex_create(&s->io_r_mutex); + mutex_create(&s->io_w_mutex); /* * Do not create a file descriptor yet. Thanks to ipv6, * socket family is not known until host name is resolved. @@ -104,7 +111,7 @@ static inline struct bio_socket * bio_checkactivesocket(struct lua_State *L, int narg) { struct bio_socket *s = bio_checksocket(L, narg); - if (! evio_is_active(&s->coio)) + if (! evio_is_active(&s->io_w)) luaL_error(L, "box.socket: socket is not initialized"); return s; } @@ -128,7 +135,7 @@ bio_initbuf(struct bio_socket *s) char name[PALLOC_POOL_NAME_MAXLEN]; const char *type = s->socktype == SOCK_STREAM ? "tcp" : "udp"; snprintf(name, sizeof(name), "box.io.%s(%d)", - type, s->coio.fd); + type, s->io_w.fd); s->iob = iobuf_new(name); } @@ -197,7 +204,7 @@ static int lbox_socket_tostring(struct lua_State *L) { struct bio_socket *s = bio_checksocket(L, -1); - lua_pushstring(L, sio_socketname(s->coio.fd)); + lua_pushstring(L, sio_socketname(s->io_w.fd)); return 1; } @@ -233,13 +240,18 @@ static int lbox_socket_close(struct lua_State *L) { struct bio_socket *s = bio_checksocket(L, -1); - if (! evio_is_active(&s->coio)) + if (! evio_is_active(&s->io_w)) return 0; if (s->iob) { iobuf_delete(s->iob); s->iob = NULL; } - evio_close(&s->coio); + ev_io_stop(&s->io_r); + evio_close(&s->io_w); + s->io_r.fd = s->io_w.fd; + assert(s->io_r.fd == -1); + mutex_destroy(&s->io_r_mutex); + mutex_destroy(&s->io_w_mutex); bio_clearerr(s); return 0; } @@ -258,7 +270,7 @@ lbox_socket_shutdown(struct lua_State *L) struct bio_socket *s = bio_checkactivesocket(L, -1); int how = luaL_checkint(L, 2); bio_clearerr(s); - if (shutdown(s->coio.fd, how)) + if (shutdown(s->io_w.fd, how)) return bio_pushsockerror(L, s, errno); /* case #1: Success */ lua_settop(L, 1); @@ -295,7 +307,7 @@ lbox_socket_connect(struct lua_State *L) double timeout = TIMEOUT_INFINITY; if (lua_gettop(L) == 4) timeout = luaL_checknumber(L, 4); - if (evio_is_active(&s->coio)) + if (evio_is_active(&s->io_w)) return bio_pushsockerror(L, s, EALREADY); bio_clearerr(s); @@ -309,8 +321,10 @@ lbox_socket_connect(struct lua_State *L) evio_timeout_update(start, &delay); @try { /* connect to a first available host */ - if (coio_connect_addrinfo(&s->coio, ai, delay)) + if (coio_connect_addrinfo(&s->io_w, ai, delay)) return bio_pushsockerror(L, s, ETIMEDOUT); + /* set coio reader socket */ + s->io_r.fd = s->io_w.fd; } @catch (SocketError *e) { return bio_pushsockerror(L, s, errno); } @finally { @@ -348,13 +362,28 @@ lbox_socket_send(struct lua_State *L) if (s->iob == NULL) return bio_pushsenderror(L, s, 0, ENOTCONN); bio_clearerr(s); + + /* acquire write lock */ + ev_tstamp start, delay; + evio_timeout_init(&start, &delay, timeout); + bool timed_out = mutex_lock_timeout(&s->io_w_mutex, delay); + if (timed_out) + return bio_pushsenderror(L, s, 0, ETIMEDOUT); + evio_timeout_update(start, &delay); + + int rc; @try { - ssize_t nwr = coio_write_timeout(&s->coio, buf, buf_size, - timeout); - if (nwr < buf_size) - return bio_pushsenderror(L, s, nwr, ETIMEDOUT); + ssize_t nwr = coio_write_timeout(&s->io_w, buf, buf_size, + delay); + if (nwr < buf_size) { + rc = bio_pushsenderror(L, s, nwr, ETIMEDOUT); + return rc; + } } @catch (SocketError *e) { - return bio_pushsenderror(L, s, 0, errno); + rc = bio_pushsenderror(L, s, 0, errno); + return rc; + } @finally { + mutex_unlock(&s->io_w_mutex); } /* case #1: Success */ lua_pushinteger(L, buf_size); @@ -386,6 +415,7 @@ lbox_socket_recv(struct lua_State *L) return bio_pushrecverror(L, s, ENOTCONN); /* Clear possible old timeout status. */ bio_clearerr(s); + /* * Readahead buffer can contain sufficient amount of * data from the previous call to cover the required read @@ -398,18 +428,33 @@ lbox_socket_recv(struct lua_State *L) ssize_t to_read = sz - ibuf_size(in); if (to_read > 0) { + int rc; + /* acquire read lock */ + ev_tstamp start, delay; + evio_timeout_init(&start, &delay, timeout); + bool timed_out = mutex_lock_timeout(&s->io_r_mutex, delay); + if (timed_out) + return bio_pushrecverror(L, s, ETIMEDOUT); + evio_timeout_update(start, &delay); + to_read = sz - ibuf_size(in); + ssize_t nrd; @try { - nrd = coio_bread_timeout(&s->coio, in, to_read, - timeout); + nrd = coio_bread_timeout(&s->io_r, in, to_read, + delay); } @catch (SocketError *e) { - return bio_pushrecverror(L, s, errno); + rc = bio_pushrecverror(L, s, errno); + return rc; + } @finally { + mutex_unlock(&s->io_r_mutex); } if (nrd < to_read) { /* timeout or EOF. */ if (errno == ETIMEDOUT) - return bio_pushrecverror(L, s, ETIMEDOUT); - return bio_pusheof(L, s); + rc = bio_pushrecverror(L, s, ETIMEDOUT); + else + rc = bio_pusheof(L, s); + return rc; } } lua_pushlstring(L, in->pos, sz); @@ -575,9 +620,18 @@ lbox_socket_readline(struct lua_State *L) if (rs_size == 0) luaL_error(L, "box.io.readline: bad separator table"); + /* acquire read lock */ + ev_tstamp start, delay; + evio_timeout_init(&start, &delay, timeout); + bool timed_out = mutex_lock_timeout(&s->io_r_mutex, delay); + if (timed_out) + return bio_pushrecverror(L, s, ETIMEDOUT); + evio_timeout_update(start, &delay); + size_t bottom = 0; int match; struct ibuf *in = &s->iob->in; + int rc; @try { /* readline implementation uses a simple state machine @@ -587,11 +641,7 @@ lbox_socket_readline(struct lua_State *L) palloc(in->pool, sizeof(struct readline_state) * rs_size); readline_state_init(L, rs, seplist); - ev_tstamp start, delay; - evio_timeout_init(&start, &delay, timeout); - while (1) { - /* case #4: user limit reached */ if (bottom == limit) { lua_pushlstring(L, in->pos, bottom); @@ -604,13 +654,15 @@ lbox_socket_readline(struct lua_State *L) * the readahead size, then read new data. */ if (bottom == ibuf_size(in)) { - ssize_t nrd = coio_bread_timeout(&s->coio, &s->iob->in, 1, - delay); + ssize_t nrd = coio_bread_timeout(&s->io_r, &s->iob->in, 1, + delay); /* case #5: eof (step 1)*/ if (nrd == 0) { if (errno == ETIMEDOUT) - return bio_pushrecverror(L, s, ETIMEDOUT); - return bio_pusheof(L, s); + rc = bio_pushrecverror(L, s, ETIMEDOUT); + else + rc = bio_pusheof(L, s); + return rc; } } @@ -622,7 +674,10 @@ lbox_socket_readline(struct lua_State *L) evio_timeout_update(start, &delay); } } @catch (SocketError *e) { - return bio_pushrecverror(L, s, errno); + rc = bio_pushrecverror(L, s, errno); + return rc; + } @finally { + mutex_unlock(&s->io_r_mutex); } /* case #1: success, separator matched */ @@ -651,7 +706,7 @@ lbox_socket_bind(struct lua_State *L) double timeout = TIMEOUT_INFINITY; if (lua_gettop(L) == 4) timeout = luaL_checknumber(L, 4); - if (evio_is_active(&s->coio)) + if (evio_is_active(&s->io_w)) return bio_pusherror(L, s, EALREADY); bio_clearerr(s); /* try to resolve a hostname */ @@ -659,7 +714,7 @@ lbox_socket_bind(struct lua_State *L) if (ai == NULL) return bio_pusherror(L, s, errno); @try { - evio_bind_addrinfo(&s->coio, ai); + evio_bind_addrinfo(&s->io_w, ai); } @catch (SocketError *e) { /* case #2: error */ return bio_pusherror(L, s, errno); @@ -685,7 +740,7 @@ lbox_socket_listen(struct lua_State *L) { struct bio_socket *s = bio_checkactivesocket(L, 1); bio_clearerr(s); - if (listen(s->coio.fd, sio_listen_backlog())) + if (listen(s->io_w.fd, sio_listen_backlog())) return bio_pusherror(L, s, errno); lua_settop(L, 1); return 1; @@ -715,8 +770,9 @@ lbox_socket_accept(struct lua_State *L) bio_pushsocket(L, SOCK_STREAM); struct bio_socket *client = lua_touserdata(L, -1); @try { - client->coio.fd = coio_accept(&s->coio, (struct sockaddr_in*)&addr, - sizeof(addr), timeout); + client->io_w.fd = coio_accept(&s->io_w, (struct sockaddr_in*)&addr, + sizeof(addr), timeout); + client->io_r.fd = client->io_w.fd; } @catch (SocketError *e) { return bio_pusherror(L, s, errno); } @@ -783,9 +839,9 @@ lbox_socket_sendto(struct lua_State *L) size_t nwr; @try { /* maybe init the socket */ - if (! evio_is_active(&s->coio)) - evio_socket(&s->coio, addr->sa_family, s->socktype, 0); - nwr = coio_sendto_timeout(&s->coio, buf, buf_size, 0, + if (! evio_is_active(&s->io_w)) + evio_socket(&s->io_w, addr->sa_family, s->socktype, 0); + nwr = coio_sendto_timeout(&s->io_w, buf, buf_size, 0, (struct sockaddr_in*)addr, addrlen, delay); } @catch (SocketError *e) { /* case #2-3: error or timeout */ @@ -832,7 +888,7 @@ lbox_socket_recvfrom(struct lua_State *L) size_t nrd; @try { ibuf_reserve(in, buf_size); - nrd = coio_recvfrom_timeout(&s->coio, in->pos, buf_size, 0, + nrd = coio_recvfrom_timeout(&s->io_w, in->pos, buf_size, 0, (struct sockaddr_in*)&addr, sizeof(addr), timeout); } @catch (SocketError *e) { diff --git a/src/tarantool.m b/src/tarantool.m index 75b76036b90a079188e5c5a452d7300ce84f86db..229785c78be71d92dca764710e59cb21a8104ddc 100644 --- a/src/tarantool.m +++ b/src/tarantool.m @@ -50,7 +50,7 @@ #include <fiber.h> #include <coeio.h> #include <iproto.h> -#include <latch.h> +#include "mutex.h" #include <recovery.h> #include <crc32.h> #include <palloc.h> @@ -211,15 +211,15 @@ core_reload_config(const struct tarantool_cfg *old_conf, i32 reload_cfg(struct tbuf *out) { - static struct tnt_latch *latch = NULL; + static struct mutex *mutex = NULL; struct tarantool_cfg new_cfg, aux_cfg; - if (latch == NULL) { - latch = palloc(eter_pool, sizeof(*latch)); - tnt_latch_create(latch); + if (mutex == NULL) { + mutex = palloc(eter_pool, sizeof(*mutex)); + mutex_create(mutex); } - if (tnt_latch_trylock(latch) == -1) { + if (mutex_trylock(mutex) == true) { out_warning(0, "Could not reload configuration: it is being reloaded right now"); tbuf_append(out, cfg_out->data, cfg_out->size); @@ -276,9 +276,8 @@ reload_cfg(struct tbuf *out) if (cfg_out->size != 0) tbuf_append(out, cfg_out->data, cfg_out->size); - tnt_latch_unlock(latch); + mutex_unlock(mutex); } - return 0; } diff --git a/test/box/socket.result b/test/box/socket.result index 61e50ff231a9f9f1ce08ec68e2911d5ffcf4ed1f..c99e5ee6f02d2ea3786fbd62202131d6c0ac9965 100644 --- a/test/box/socket.result +++ b/test/box/socket.result @@ -907,7 +907,7 @@ ping lua s:close() --- ... -lua reps = 0 function bug1160869() local s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(12) reps = reps + 1 end end) ) return s:send(box.pack('iii', 65280, 0, 1)) end +lua replies = 0 function bug1160869() local s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(12) replies = replies + 1 end end) ) return s:send(box.pack('iii', 65280, 0, 1)) end --- ... lua bug1160869() @@ -922,6 +922,25 @@ lua bug1160869() --- - 12 ... +lua replies +--- + - 3 +... +lua s = nil syncno = 0 reps = 0 function iostart() if s ~= nil then return end s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(12) reps = reps + 1 end end)) end function iotest() iostart() syncno = syncno + 1 return s:send(box.pack('iii', 65280, 0, syncno)) end +--- +... +lua iotest() +--- + - 12 +... +lua iotest() +--- + - 12 +... +lua iotest() +--- + - 12 +... lua reps --- - 3 diff --git a/test/box/socket.test b/test/box/socket.test index 0d64817b3b43924ed67949f65c5da8e382ab0220..1eaf57de9c26ec65313f556bbd94c627ccfc5fa6 100644 --- a/test/box/socket.test +++ b/test/box/socket.test @@ -514,7 +514,7 @@ exec admin "lua s:close()" # (https://bugs.launchpad.net/tarantool/+bug/1160869) # test=""" -reps = 0 +replies = 0 function bug1160869() local s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) @@ -522,7 +522,7 @@ function bug1160869() box.fiber.detach() while true do s:recv(12) - reps = reps + 1 + replies = replies + 1 end end) ) return s:send(box.pack('iii', 65280, 0, 1)) @@ -532,4 +532,35 @@ exec admin "lua " + test.replace('\n', ' ') exec admin "lua bug1160869()" exec admin "lua bug1160869()" exec admin "lua bug1160869()" +exec admin "lua replies" + +test=""" +s = nil +syncno = 0 +reps = 0 +function iostart() + if s ~= nil then + return + end + s = box.socket.tcp() + s:connect('127.0.0.1', box.cfg.primary_port) + box.fiber.resume( box.fiber.create(function() + box.fiber.detach() + while true do + s:recv(12) + reps = reps + 1 + end + end)) +end + +function iotest() + iostart() + syncno = syncno + 1 + return s:send(box.pack('iii', 65280, 0, syncno)) +end +""" +exec admin "lua " + test.replace('\n', ' ') +exec admin "lua iotest()" +exec admin "lua iotest()" +exec admin "lua iotest()" exec admin "lua reps"