From 1f90a823aa0b628300fe1495caec7feea27afc11 Mon Sep 17 00:00:00 2001
From: Dmitry Simonenko <pmwkaa@gmail.com>
Date: Mon, 1 Apr 2013 19:05:10 +0400
Subject: [PATCH] socket-concurrent: implement exclusive per readers and
 writers socket access. (https://bugs.launchpad.net/tarantool/+bug/1160869)

---
 include/erwlock.h      | 137 +++++++++++++++++++++++++++++++++++++++++
 src/lua/lua_socket.m   |  87 +++++++++++++++++++++-----
 test/box/socket.result |  21 ++++++-
 test/box/socket.test   |  35 ++++++++++-
 4 files changed, 260 insertions(+), 20 deletions(-)
 create mode 100644 include/erwlock.h

diff --git a/include/erwlock.h b/include/erwlock.h
new file mode 100644
index 0000000000..7646e741d1
--- /dev/null
+++ b/include/erwlock.h
@@ -0,0 +1,137 @@
+#ifndef TARANTOOL_ERWLOCK_H_INCLUDED
+#define TARNATOOL_ERWLOCK_H_INCLUDED
+
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <assert.h>
+#include <rlist.h>
+
+/* exclusive readers-writers locks */
+
+struct erwlock {
+	struct rlist readers, writers;
+};
+
+static inline void
+erwlock_init(struct erwlock *l) {
+	rlist_create(&l->readers);
+	rlist_create(&l->writers);
+}
+
+static inline void
+erwlock_destroy(struct erwlock *l) {
+	struct fiber *f;
+	while (!rlist_empty(&l->readers)) {
+		f = rlist_first_entry(&l->readers, struct fiber, state);
+		rlist_del_entry(f, state);
+	}
+	while (!rlist_empty(&l->writers)) {
+		f = rlist_first_entry(&l->writers, struct fiber, state);
+		rlist_del_entry(f, state);
+	}
+}
+
+static inline bool
+erwlock_lockq_timeout(struct rlist *q, ev_tstamp timeout) {
+	rlist_add_tail_entry(q, fiber, state);
+	ev_tstamp start = timeout;
+	while (timeout > 0) {
+		struct fiber *f = rlist_first_entry(q, struct fiber, state);
+		if (f == fiber)
+			break;
+		fiber_yield_timeout(timeout);
+		timeout -= ev_now() - start;
+		if (timeout <= 0) {
+			rlist_del_entry(fiber, state);
+			errno = ETIMEDOUT;
+			return true;
+		}
+	}
+	return false;
+}
+
+static inline void
+erwlock_unlockq(struct rlist *q) {
+	struct fiber *f;
+	f = rlist_first_entry(q, struct fiber, state);
+	assert(f == fiber);
+	rlist_del_entry(f, state);
+	if (!rlist_empty(q)) {
+		f = rlist_first_entry(q, struct fiber, state);
+		fiber_wakeup(f);
+	}
+}
+
+static inline bool
+erwlock_lockedq(struct rlist *q) {
+	return rlist_empty(q);
+}
+
+static inline bool
+erwlock_lockread_timeout(struct erwlock *l, ev_tstamp timeout) {
+	return erwlock_lockq_timeout(&l->readers, timeout);
+}
+
+static inline bool
+erwlock_lockread(struct erwlock *l) {
+	return erwlock_lockread_timeout(l, TIMEOUT_INFINITY);
+}
+
+static inline void
+erwlock_unlockread(struct erwlock *l) {
+	erwlock_unlockq(&l->readers);
+}
+
+static inline bool
+erwlock_lockedread(struct erwlock *l) {
+	return erwlock_lockedq(&l->readers);
+}
+
+static inline bool
+erwlock_lockwrite_timeout(struct erwlock *l, ev_tstamp timeout) {
+	return erwlock_lockq_timeout(&l->writers, timeout);
+}
+
+static inline bool
+erwlock_lockwrite(struct erwlock *l) {
+	return erwlock_lockwrite_timeout(l, TIMEOUT_INFINITY);
+}
+
+static inline void
+erwlock_unlockwrite(struct erwlock *l) {
+	erwlock_unlockq(&l->writers);
+}
+
+static inline bool
+erwlock_lockedwrite(struct erwlock *l) {
+	return erwlock_lockedq(&l->writers);
+}
+
+#endif
diff --git a/src/lua/lua_socket.m b/src/lua/lua_socket.m
index e49e7b6397..fb290513ec 100644
--- a/src/lua/lua_socket.m
+++ b/src/lua/lua_socket.m
@@ -47,6 +47,7 @@
 #include "tbuf.h"
 #include <lua/init.h>
 #include <stdlib.h>
+#include <erwlock.h>
 
 static const char socketlib_name[] = "box.socket";
 
@@ -66,7 +67,8 @@ enum bio_status {
 };
 
 struct bio_socket {
-	struct ev_io coio;
+	struct ev_io coio, coiord;
+	struct erwlock lock;
 	struct iobuf *iob;
 	/** SOCK_DGRAM or SOCK_STREAM */
 	int socktype;
@@ -79,10 +81,12 @@ bio_pushsocket(struct lua_State *L, int socktype)
 	struct bio_socket *s = lua_newuserdata(L, sizeof(struct bio_socket));
 	luaL_getmetatable(L, socketlib_name);
 	lua_setmetatable(L, -2);
+	coio_init(&s->coiord);
 	coio_init(&s->coio);
 	s->socktype = socktype;
 	s->iob = NULL;
 	s->error = 0;
+	erwlock_init(&s->lock);
 	/*
 	 * Do not create a file descriptor yet. Thanks to ipv6,
 	 * socket family is not known until host name is resolved.
@@ -240,6 +244,7 @@ lbox_socket_close(struct lua_State *L)
 		s->iob = NULL;
 	}
 	evio_close(&s->coio);
+	erwlock_destroy(&s->lock);
 	bio_clearerr(s);
 	return 0;
 }
@@ -311,6 +316,8 @@ lbox_socket_connect(struct lua_State *L)
 		/* connect to a first available host */
 		if (coio_connect_addrinfo(&s->coio, ai, delay))
 			return bio_pushsockerror(L, s, ETIMEDOUT);
+		/* set coio reader socket */
+		s->coiord.fd = s->coio.fd;
 	} @catch (SocketError *e) {
 		return bio_pushsockerror(L, s, errno);
 	} @finally {
@@ -348,16 +355,32 @@ lbox_socket_send(struct lua_State *L)
 	if (s->iob == NULL)
 		return bio_pushsenderror(L, s, 0, ENOTCONN);
 	bio_clearerr(s);
+
+	/* acquire write lock */
+	ev_tstamp start, delay;
+	evio_timeout_init(&start, &delay, timeout);
+	bool tmout = erwlock_lockwrite_timeout(&s->lock, delay);
+	if (tmout)
+		return bio_pushsenderror(L, s, 0, ETIMEDOUT);
+	evio_timeout_update(start, &delay);
+
+	int rc;
 	@try {
 		ssize_t nwr = coio_write_timeout(&s->coio, buf, buf_size,
-						 timeout);
-		if (nwr < buf_size)
-			return bio_pushsenderror(L, s, nwr, ETIMEDOUT);
+						 delay);
+		if (nwr < buf_size) {
+			rc = bio_pushsenderror(L, s, nwr, ETIMEDOUT);
+			erwlock_unlockwrite(&s->lock);
+			return rc;
+		}
 	} @catch (SocketError *e) {
-		return bio_pushsenderror(L, s, 0, errno);
+		rc = bio_pushsenderror(L, s, 0, errno);
+		erwlock_unlockwrite(&s->lock);
+		return rc;
 	}
 	/* case #1: Success */
 	lua_pushinteger(L, buf_size);
+	erwlock_unlockwrite(&s->lock);
 	return 1;
 }
 
@@ -386,6 +409,15 @@ lbox_socket_recv(struct lua_State *L)
 		return bio_pushrecverror(L, s, ENOTCONN);
 	/* Clear possible old timeout status. */
 	bio_clearerr(s);
+
+	/* acquire read lock */
+	ev_tstamp start, delay;
+	evio_timeout_init(&start, &delay, timeout);
+	bool tmout = erwlock_lockread_timeout(&s->lock, delay);
+	if (tmout)
+		return bio_pushrecverror(L, s, ETIMEDOUT);
+	evio_timeout_update(start, &delay);
+
 	/*
 	 * Readahead buffer can contain sufficient amount of
 	 * data from the previous call to cover the required read
@@ -397,23 +429,30 @@ lbox_socket_recv(struct lua_State *L)
 	struct ibuf *in = &s->iob->in;
 	ssize_t to_read = sz - ibuf_size(in);
 
+	int rc;
 	if (to_read > 0) {
 		ssize_t nrd;
 		@try {
-			nrd = coio_bread_timeout(&s->coio, in, to_read,
-						 timeout);
+			nrd = coio_bread_timeout(&s->coiord, in, to_read,
+						 delay);
 		} @catch (SocketError *e) {
-			return bio_pushrecverror(L, s, errno);
+			rc = bio_pushrecverror(L, s, errno);
+			erwlock_unlockread(&s->lock);
+			return rc;
 		}
 		if (nrd < to_read) {
 			/*  timeout or EOF. */
 			if (errno == ETIMEDOUT)
-				return bio_pushrecverror(L, s, ETIMEDOUT);
-			return bio_pusheof(L, s);
+				rc = bio_pushrecverror(L, s, ETIMEDOUT);
+			else
+				rc = bio_pusheof(L, s);
+			erwlock_unlockread(&s->lock);
+			return rc;
 		}
 	}
 	lua_pushlstring(L, in->pos, sz);
 	in->pos += sz;
+	erwlock_unlockread(&s->lock);
 	return 1;
 }
 
@@ -575,9 +614,18 @@ lbox_socket_readline(struct lua_State *L)
 	if (rs_size == 0)
 		luaL_error(L, "box.io.readline: bad separator table");
 
+	/* acquire read lock */
+	ev_tstamp start, delay;
+	evio_timeout_init(&start, &delay, timeout);
+	bool tmout = erwlock_lockread_timeout(&s->lock, delay);
+	if (tmout)
+		return bio_pushrecverror(L, s, ETIMEDOUT);
+	evio_timeout_update(start, &delay);
+
 	size_t bottom = 0;
 	int match;
 	struct ibuf *in = &s->iob->in;
+	int rc;
 
 	@try {
 		/* readline implementation uses a simple state machine
@@ -587,9 +635,6 @@ lbox_socket_readline(struct lua_State *L)
 			palloc(in->pool, sizeof(struct readline_state) * rs_size);
 		readline_state_init(L, rs, seplist);
 
-		ev_tstamp start, delay;
-		evio_timeout_init(&start, &delay, timeout);
-
 		while (1) {
 
 			/* case #4: user limit reached */
@@ -597,6 +642,7 @@ lbox_socket_readline(struct lua_State *L)
 				lua_pushlstring(L, in->pos, bottom);
 				s->iob->in.pos += bottom;
 				bio_pushstatus(L, BIO_LIMIT);
+				erwlock_unlockread(&s->lock);
 				return 2;
 			}
 
@@ -604,13 +650,16 @@ lbox_socket_readline(struct lua_State *L)
 			 * the readahead size, then read new data. */
 			if (bottom == ibuf_size(in)) {
 
-				ssize_t nrd = coio_bread_timeout(&s->coio, &s->iob->in, 1,
+				ssize_t nrd = coio_bread_timeout(&s->coiord, &s->iob->in, 1,
 						                 delay);
 				/* case #5: eof (step 1)*/
 				if (nrd == 0) {
 					if (errno == ETIMEDOUT)
-						return bio_pushrecverror(L, s, ETIMEDOUT);
-					return bio_pusheof(L, s);
+						rc = bio_pushrecverror(L, s, ETIMEDOUT);
+					else
+						rc = bio_pusheof(L, s);
+					erwlock_unlockread(&s->lock);
+					return rc;
 				}
 			}
 
@@ -622,7 +671,9 @@ lbox_socket_readline(struct lua_State *L)
 			evio_timeout_update(start, &delay);
 		}
 	} @catch (SocketError *e) {
-		return bio_pushrecverror(L, s, errno);
+		rc = bio_pushrecverror(L, s, errno);
+		erwlock_unlockread(&s->lock);
+		return rc;
 	}
 
 	/* case #1: success, separator matched */
@@ -630,6 +681,7 @@ lbox_socket_readline(struct lua_State *L)
 	in->pos += bottom;
 	lua_pushnil(L);
 	lua_rawgeti(L, seplist, match + 1);
+	erwlock_unlockread(&s->lock);
 	return 3;
 }
 
@@ -717,6 +769,7 @@ lbox_socket_accept(struct lua_State *L)
 	@try {
 		client->coio.fd = coio_accept(&s->coio, (struct sockaddr_in*)&addr,
 		                              sizeof(addr), timeout);
+		client->coiord.fd = client->coio.fd;
 	} @catch (SocketError *e) {
 		return bio_pusherror(L, s, errno);
 	}
diff --git a/test/box/socket.result b/test/box/socket.result
index 61e50ff231..c99e5ee6f0 100644
--- a/test/box/socket.result
+++ b/test/box/socket.result
@@ -907,7 +907,7 @@ ping
 lua s:close()
 ---
 ...
-lua  reps = 0 function bug1160869() 	local s = box.socket.tcp() 	s:connect('127.0.0.1', box.cfg.primary_port) 	box.fiber.resume( box.fiber.create(function() 		box.fiber.detach() 		while true do 			s:recv(12) 			reps = reps + 1 		end 	end) ) 	return s:send(box.pack('iii', 65280, 0, 1)) end 
+lua  replies = 0 function bug1160869() 	local s = box.socket.tcp() 	s:connect('127.0.0.1', box.cfg.primary_port) 	box.fiber.resume( box.fiber.create(function() 		box.fiber.detach() 		while true do 			s:recv(12) 			replies = replies + 1 		end 	end) ) 	return s:send(box.pack('iii', 65280, 0, 1)) end 
 ---
 ...
 lua bug1160869()
@@ -922,6 +922,25 @@ lua bug1160869()
 ---
  - 12
 ...
+lua replies
+---
+ - 3
+...
+lua  s = nil syncno = 0 reps = 0 function iostart() 	if s ~= nil then 		return 	end 	s = box.socket.tcp() 	s:connect('127.0.0.1', box.cfg.primary_port) 	box.fiber.resume( box.fiber.create(function() 		box.fiber.detach() 		while true do 			s:recv(12) 			reps = reps + 1 		end 	end)) end  function iotest() 	iostart() 	syncno = syncno + 1 	return s:send(box.pack('iii', 65280, 0, syncno)) end 
+---
+...
+lua iotest()
+---
+ - 12
+...
+lua iotest()
+---
+ - 12
+...
+lua iotest()
+---
+ - 12
+...
 lua reps
 ---
  - 3
diff --git a/test/box/socket.test b/test/box/socket.test
index 0d64817b3b..1eaf57de9c 100644
--- a/test/box/socket.test
+++ b/test/box/socket.test
@@ -514,7 +514,7 @@ exec admin "lua s:close()"
 # (https://bugs.launchpad.net/tarantool/+bug/1160869)
 #
 test="""
-reps = 0
+replies = 0
 function bug1160869()
 	local s = box.socket.tcp()
 	s:connect('127.0.0.1', box.cfg.primary_port)
@@ -522,7 +522,7 @@ function bug1160869()
 		box.fiber.detach()
 		while true do
 			s:recv(12)
-			reps = reps + 1
+			replies = replies + 1
 		end
 	end) )
 	return s:send(box.pack('iii', 65280, 0, 1))
@@ -532,4 +532,35 @@ exec admin "lua " + test.replace('\n', ' ')
 exec admin "lua bug1160869()"
 exec admin "lua bug1160869()"
 exec admin "lua bug1160869()"
+exec admin "lua replies"
+
+test="""
+s = nil
+syncno = 0
+reps = 0
+function iostart()
+	if s ~= nil then
+		return
+	end
+	s = box.socket.tcp()
+	s:connect('127.0.0.1', box.cfg.primary_port)
+	box.fiber.resume( box.fiber.create(function()
+		box.fiber.detach()
+		while true do
+			s:recv(12)
+			reps = reps + 1
+		end
+	end))
+end
+
+function iotest()
+	iostart()
+	syncno = syncno + 1
+	return s:send(box.pack('iii', 65280, 0, syncno))
+end
+"""
+exec admin "lua " + test.replace('\n', ' ')
+exec admin "lua iotest()"
+exec admin "lua iotest()"
+exec admin "lua iotest()"
 exec admin "lua reps"
-- 
GitLab