From 44a8f3639b576549b2647af2012c5883483b482a Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Tue, 2 Apr 2013 23:21:26 +0400 Subject: [PATCH] https://bugs.launchpad.net/tarantool/+bug/1160869, code review Review comments. --- include/latch.h | 72 ----------------------------- include/mutex.h | 88 +++++++++++++++++++++++++++-------- src/CMakeLists.txt | 1 - src/latch.m | 71 ----------------------------- src/lua/lua_socket.m | 106 ++++++++++++++++++++++--------------------- src/tarantool.m | 15 +++--- 6 files changed, 130 insertions(+), 223 deletions(-) delete mode 100644 include/latch.h delete mode 100644 src/latch.m diff --git a/include/latch.h b/include/latch.h deleted file mode 100644 index 438bd41a22..0000000000 --- a/include/latch.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef TARANTOOL_LATCH_H_INCLUDED -#define TARANTOOL_LATCH_H_INCLUDED -/* - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * 1. Redistributions of source code must retain the above - * copyright notice, this list of conditions and the - * following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR - * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ -#include <stdbool.h> - -struct fiber; - -/* - * Internal implementation of a container for a mutex like object - * with similar interface. It's used boolean variable because of - * single threaded nature of tarantool. But it's rather simple to change - * this variable to a mutex object to maintain multi threaded approach. - */ -struct tnt_latch { - bool locked; - - struct fiber *owner; -}; - -/** - * Initialize the given latch. - * - * @param latch Latch to be initialized. - */ -void tnt_latch_create(struct tnt_latch *latch); -/** - * Destroy the given latch. - */ -void tnt_latch_destroy(struct tnt_latch *latch); -/** - * Set the latch to the locked state. If it's already locked - * returns -1 value immediately otherwise returns 0. - * - * @param latch Latch to be locked. - */ -int tnt_latch_trylock(struct tnt_latch *latch); -/** - * Unlock the locked latch. - * - * @param latch Latch to be unlocked. - */ -void tnt_latch_unlock(struct tnt_latch *latch); - - -#endif /* TARANTOOL_LATCH_H_INCLUDED */ diff --git a/include/mutex.h b/include/mutex.h index 6069685d9e..eca995334b 100644 --- a/include/mutex.h +++ b/include/mutex.h @@ -32,33 +32,58 @@ #include <assert.h> #include <rlist.h> -struct mutex { - struct rlist q; +/** Mutex of cooperative multitasking environment. */ + +struct mutex +{ + /** + * The queue of fibers waiting on a mutex. + * The first fiber owns the mutex. + */ + struct rlist queue; }; +/** + * Initialize the given mutex. + * + * @param m mutex to be initialized. + */ static inline void -mutex_init(struct mutex *m) { - rlist_create(&m->q); +mutex_create(struct mutex *m) +{ + rlist_create(&m->queue); } static inline void -mutex_destroy(struct mutex *m) { - struct fiber *f; - while (!rlist_empty(&m->q)) { - f = rlist_first_entry(&m->q, struct fiber, state); +mutex_destroy(struct mutex *m) +{ + while (!rlist_empty(&m->queue)) { + struct fiber *f = rlist_first_entry(&m->queue, + struct fiber, state); rlist_del_entry(f, state); } - rlist_create(&m->q); } +/** + * Lock a mutex. If the mutex is already locked by another fiber, + * waits for timeout. + * + * @param m mutex to be locked. + * + * @retval false success + * @retval true timeout + */ static inline bool -mutex_lock_timeout(struct mutex *m, ev_tstamp timeout) { - rlist_add_tail_entry(&m->q, fiber, state); +mutex_lock_timeout(struct mutex *m, ev_tstamp timeout) +{ + rlist_add_tail_entry(&m->queue, fiber, state); ev_tstamp start = timeout; while (timeout > 0) { - struct fiber *f = rlist_first_entry(&m->q, struct fiber, state); + struct fiber *f = rlist_first_entry(&m->queue, + struct fiber, state); if (f == fiber) break; + fiber_yield_timeout(timeout); timeout -= ev_now() - start; if (timeout <= 0) { @@ -70,21 +95,46 @@ mutex_lock_timeout(struct mutex *m, ev_tstamp timeout) { return false; } +/** + * Lock a mutex (no timeout). Waits indefinitely until + * the current fiber can gain access to the mutex. + */ +static inline void +mutex_lock(struct mutex *m) +{ + (void) mutex_lock_timeout(m, TIMEOUT_INFINITY); +} + +/** + * Try to lock a mutex. Return immediately if the mutex is locked. + * @retval false success + * @retval true the mutex is locked. + */ static inline bool -mutex_lock(struct mutex *m) { - return mutex_lock_timeout(m, TIMEOUT_INFINITY); +mutex_trylock(struct mutex *m) +{ + if (rlist_empty(&m->queue)) { + mutex_lock(m); + return false; + } + return true; } +/** + * Unlock a mutex. The fiber calling this function must + * own the mutex. + */ static inline void -mutex_unlock(struct mutex *m) { +mutex_unlock(struct mutex *m) +{ struct fiber *f; - f = rlist_first_entry(&m->q, struct fiber, state); + f = rlist_first_entry(&m->queue, struct fiber, state); assert(f == fiber); rlist_del_entry(f, state); - if (!rlist_empty(&m->q)) { - f = rlist_first_entry(&m->q, struct fiber, state); + if (!rlist_empty(&m->queue)) { + f = rlist_first_entry(&m->queue, struct fiber, state); fiber_wakeup(f); } } -#endif +#endif /* TARANTOOL_MUTEX_H_INCLUDED */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ae8b99a0d6..9a3b886393 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -94,7 +94,6 @@ set (common_sources exception.m errcode.c errinj.m - latch.m fio.c crc32.c rope.c diff --git a/src/latch.m b/src/latch.m deleted file mode 100644 index ff08a1ac04..0000000000 --- a/src/latch.m +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * 1. Redistributions of source code must retain the above - * copyright notice, this list of conditions and the - * following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR - * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ -#include "latch.h" -#include "fiber.h" - -void -tnt_latch_create(struct tnt_latch *latch) -{ - latch->locked = false; - latch->owner = NULL; -} - -void -tnt_latch_destroy(struct tnt_latch *latch) -{ - assert(latch->locked == false); - - latch->owner = NULL; -} - -int -tnt_latch_trylock(struct tnt_latch *latch) -{ - if (latch->locked) { - assert(latch->owner != fiber); - - return -1; - } - - assert(latch->owner == NULL); - - latch->locked = true; - latch->owner = fiber; - - return 0; -} - -void -tnt_latch_unlock(struct tnt_latch *latch) -{ - assert(latch->owner == fiber); - - latch->locked = false; - latch->owner = NULL; -} diff --git a/src/lua/lua_socket.m b/src/lua/lua_socket.m index db4b39a6ba..c341be5ed9 100644 --- a/src/lua/lua_socket.m +++ b/src/lua/lua_socket.m @@ -67,8 +67,10 @@ enum bio_status { }; struct bio_socket { - struct ev_io crd, cwr; - struct mutex lrd, lwr; + struct ev_io io_r; + struct ev_io io_w; + struct mutex io_r_mutex; + struct mutex io_w_mutex; struct iobuf *iob; /** SOCK_DGRAM or SOCK_STREAM */ int socktype; @@ -81,13 +83,13 @@ bio_pushsocket(struct lua_State *L, int socktype) struct bio_socket *s = lua_newuserdata(L, sizeof(struct bio_socket)); luaL_getmetatable(L, socketlib_name); lua_setmetatable(L, -2); - coio_init(&s->crd); - coio_init(&s->cwr); + coio_init(&s->io_r); + coio_init(&s->io_w); s->socktype = socktype; s->iob = NULL; s->error = 0; - mutex_init(&s->lrd); - mutex_init(&s->lwr); + mutex_create(&s->io_r_mutex); + mutex_create(&s->io_w_mutex); /* * Do not create a file descriptor yet. Thanks to ipv6, * socket family is not known until host name is resolved. @@ -109,7 +111,7 @@ static inline struct bio_socket * bio_checkactivesocket(struct lua_State *L, int narg) { struct bio_socket *s = bio_checksocket(L, narg); - if (! evio_is_active(&s->cwr)) + if (! evio_is_active(&s->io_w)) luaL_error(L, "box.socket: socket is not initialized"); return s; } @@ -133,7 +135,7 @@ bio_initbuf(struct bio_socket *s) char name[PALLOC_POOL_NAME_MAXLEN]; const char *type = s->socktype == SOCK_STREAM ? "tcp" : "udp"; snprintf(name, sizeof(name), "box.io.%s(%d)", - type, s->cwr.fd); + type, s->io_w.fd); s->iob = iobuf_new(name); } @@ -202,7 +204,7 @@ static int lbox_socket_tostring(struct lua_State *L) { struct bio_socket *s = bio_checksocket(L, -1); - lua_pushstring(L, sio_socketname(s->cwr.fd)); + lua_pushstring(L, sio_socketname(s->io_w.fd)); return 1; } @@ -238,15 +240,18 @@ static int lbox_socket_close(struct lua_State *L) { struct bio_socket *s = bio_checksocket(L, -1); - if (! evio_is_active(&s->cwr)) + if (! evio_is_active(&s->io_w)) return 0; if (s->iob) { iobuf_delete(s->iob); s->iob = NULL; } - evio_close(&s->cwr); - mutex_destroy(&s->lrd); - mutex_destroy(&s->lwr); + ev_io_stop(&s->io_r); + evio_close(&s->io_w); + s->io_r.fd = s->io_w.fd; + assert(s->io_r.fd == -1); + mutex_destroy(&s->io_r_mutex); + mutex_destroy(&s->io_w_mutex); bio_clearerr(s); return 0; } @@ -265,7 +270,7 @@ lbox_socket_shutdown(struct lua_State *L) struct bio_socket *s = bio_checkactivesocket(L, -1); int how = luaL_checkint(L, 2); bio_clearerr(s); - if (shutdown(s->cwr.fd, how)) + if (shutdown(s->io_w.fd, how)) return bio_pushsockerror(L, s, errno); /* case #1: Success */ lua_settop(L, 1); @@ -302,7 +307,7 @@ lbox_socket_connect(struct lua_State *L) double timeout = TIMEOUT_INFINITY; if (lua_gettop(L) == 4) timeout = luaL_checknumber(L, 4); - if (evio_is_active(&s->cwr)) + if (evio_is_active(&s->io_w)) return bio_pushsockerror(L, s, EALREADY); bio_clearerr(s); @@ -316,10 +321,10 @@ lbox_socket_connect(struct lua_State *L) evio_timeout_update(start, &delay); @try { /* connect to a first available host */ - if (coio_connect_addrinfo(&s->cwr, ai, delay)) + if (coio_connect_addrinfo(&s->io_w, ai, delay)) return bio_pushsockerror(L, s, ETIMEDOUT); /* set coio reader socket */ - s->crd.fd = s->cwr.fd; + s->io_r.fd = s->io_w.fd; } @catch (SocketError *e) { return bio_pushsockerror(L, s, errno); } @finally { @@ -361,28 +366,27 @@ lbox_socket_send(struct lua_State *L) /* acquire write lock */ ev_tstamp start, delay; evio_timeout_init(&start, &delay, timeout); - bool tmout = mutex_lock_timeout(&s->lwr, delay); - if (tmout) + bool timed_out = mutex_lock_timeout(&s->io_w_mutex, delay); + if (timed_out) return bio_pushsenderror(L, s, 0, ETIMEDOUT); evio_timeout_update(start, &delay); int rc; @try { - ssize_t nwr = coio_write_timeout(&s->cwr, buf, buf_size, + ssize_t nwr = coio_write_timeout(&s->io_w, buf, buf_size, delay); if (nwr < buf_size) { rc = bio_pushsenderror(L, s, nwr, ETIMEDOUT); - mutex_unlock(&s->lwr); return rc; } } @catch (SocketError *e) { rc = bio_pushsenderror(L, s, 0, errno); - mutex_unlock(&s->lwr); return rc; + } @finally { + mutex_unlock(&s->io_w_mutex); } /* case #1: Success */ lua_pushinteger(L, buf_size); - mutex_unlock(&s->lwr); return 1; } @@ -412,14 +416,6 @@ lbox_socket_recv(struct lua_State *L) /* Clear possible old timeout status. */ bio_clearerr(s); - /* acquire read lock */ - ev_tstamp start, delay; - evio_timeout_init(&start, &delay, timeout); - bool tmout = mutex_lock_timeout(&s->lrd, delay); - if (tmout) - return bio_pushrecverror(L, s, ETIMEDOUT); - evio_timeout_update(start, &delay); - /* * Readahead buffer can contain sufficient amount of * data from the previous call to cover the required read @@ -431,16 +427,26 @@ lbox_socket_recv(struct lua_State *L) struct ibuf *in = &s->iob->in; ssize_t to_read = sz - ibuf_size(in); - int rc; if (to_read > 0) { + int rc; + /* acquire read lock */ + ev_tstamp start, delay; + evio_timeout_init(&start, &delay, timeout); + bool timed_out = mutex_lock_timeout(&s->io_r_mutex, delay); + if (timed_out) + return bio_pushrecverror(L, s, ETIMEDOUT); + evio_timeout_update(start, &delay); + to_read = sz - ibuf_size(in); + ssize_t nrd; @try { - nrd = coio_bread_timeout(&s->crd, in, to_read, + nrd = coio_bread_timeout(&s->io_r, in, to_read, delay); } @catch (SocketError *e) { rc = bio_pushrecverror(L, s, errno); - mutex_unlock(&s->lrd); return rc; + } @finally { + mutex_unlock(&s->io_r_mutex); } if (nrd < to_read) { /* timeout or EOF. */ @@ -448,13 +454,11 @@ lbox_socket_recv(struct lua_State *L) rc = bio_pushrecverror(L, s, ETIMEDOUT); else rc = bio_pusheof(L, s); - mutex_unlock(&s->lrd); return rc; } } lua_pushlstring(L, in->pos, sz); in->pos += sz; - mutex_unlock(&s->lrd); return 1; } @@ -619,8 +623,8 @@ lbox_socket_readline(struct lua_State *L) /* acquire read lock */ ev_tstamp start, delay; evio_timeout_init(&start, &delay, timeout); - bool tmout = mutex_lock_timeout(&s->lrd, delay); - if (tmout) + bool timed_out = mutex_lock_timeout(&s->io_r_mutex, delay); + if (timed_out) return bio_pushrecverror(L, s, ETIMEDOUT); evio_timeout_update(start, &delay); @@ -643,7 +647,6 @@ lbox_socket_readline(struct lua_State *L) lua_pushlstring(L, in->pos, bottom); s->iob->in.pos += bottom; bio_pushstatus(L, BIO_LIMIT); - mutex_unlock(&s->lrd); return 2; } @@ -651,7 +654,7 @@ lbox_socket_readline(struct lua_State *L) * the readahead size, then read new data. */ if (bottom == ibuf_size(in)) { - ssize_t nrd = coio_bread_timeout(&s->crd, &s->iob->in, 1, + ssize_t nrd = coio_bread_timeout(&s->io_r, &s->iob->in, 1, delay); /* case #5: eof (step 1)*/ if (nrd == 0) { @@ -659,7 +662,6 @@ lbox_socket_readline(struct lua_State *L) rc = bio_pushrecverror(L, s, ETIMEDOUT); else rc = bio_pusheof(L, s); - mutex_unlock(&s->lrd); return rc; } } @@ -673,8 +675,9 @@ lbox_socket_readline(struct lua_State *L) } } @catch (SocketError *e) { rc = bio_pushrecverror(L, s, errno); - mutex_unlock(&s->lrd); return rc; + } @finally { + mutex_unlock(&s->io_r_mutex); } /* case #1: success, separator matched */ @@ -682,7 +685,6 @@ lbox_socket_readline(struct lua_State *L) in->pos += bottom; lua_pushnil(L); lua_rawgeti(L, seplist, match + 1); - mutex_unlock(&s->lrd); return 3; } @@ -704,7 +706,7 @@ lbox_socket_bind(struct lua_State *L) double timeout = TIMEOUT_INFINITY; if (lua_gettop(L) == 4) timeout = luaL_checknumber(L, 4); - if (evio_is_active(&s->cwr)) + if (evio_is_active(&s->io_w)) return bio_pusherror(L, s, EALREADY); bio_clearerr(s); /* try to resolve a hostname */ @@ -712,7 +714,7 @@ lbox_socket_bind(struct lua_State *L) if (ai == NULL) return bio_pusherror(L, s, errno); @try { - evio_bind_addrinfo(&s->cwr, ai); + evio_bind_addrinfo(&s->io_w, ai); } @catch (SocketError *e) { /* case #2: error */ return bio_pusherror(L, s, errno); @@ -738,7 +740,7 @@ lbox_socket_listen(struct lua_State *L) { struct bio_socket *s = bio_checkactivesocket(L, 1); bio_clearerr(s); - if (listen(s->cwr.fd, sio_listen_backlog())) + if (listen(s->io_w.fd, sio_listen_backlog())) return bio_pusherror(L, s, errno); lua_settop(L, 1); return 1; @@ -768,9 +770,9 @@ lbox_socket_accept(struct lua_State *L) bio_pushsocket(L, SOCK_STREAM); struct bio_socket *client = lua_touserdata(L, -1); @try { - client->cwr.fd = coio_accept(&s->cwr, (struct sockaddr_in*)&addr, + client->io_w.fd = coio_accept(&s->io_w, (struct sockaddr_in*)&addr, sizeof(addr), timeout); - client->crd.fd = client->cwr.fd; + client->io_r.fd = client->io_w.fd; } @catch (SocketError *e) { return bio_pusherror(L, s, errno); } @@ -837,9 +839,9 @@ lbox_socket_sendto(struct lua_State *L) size_t nwr; @try { /* maybe init the socket */ - if (! evio_is_active(&s->cwr)) - evio_socket(&s->cwr, addr->sa_family, s->socktype, 0); - nwr = coio_sendto_timeout(&s->cwr, buf, buf_size, 0, + if (! evio_is_active(&s->io_w)) + evio_socket(&s->io_w, addr->sa_family, s->socktype, 0); + nwr = coio_sendto_timeout(&s->io_w, buf, buf_size, 0, (struct sockaddr_in*)addr, addrlen, delay); } @catch (SocketError *e) { /* case #2-3: error or timeout */ @@ -886,7 +888,7 @@ lbox_socket_recvfrom(struct lua_State *L) size_t nrd; @try { ibuf_reserve(in, buf_size); - nrd = coio_recvfrom_timeout(&s->cwr, in->pos, buf_size, 0, + nrd = coio_recvfrom_timeout(&s->io_w, in->pos, buf_size, 0, (struct sockaddr_in*)&addr, sizeof(addr), timeout); } @catch (SocketError *e) { diff --git a/src/tarantool.m b/src/tarantool.m index 2601f873e5..2d03bf9ed8 100644 --- a/src/tarantool.m +++ b/src/tarantool.m @@ -50,7 +50,7 @@ #include <fiber.h> #include <coeio.h> #include <iproto.h> -#include <latch.h> +#include "mutex.h" #include <recovery.h> #include <crc32.h> #include <palloc.h> @@ -211,15 +211,15 @@ core_reload_config(const struct tarantool_cfg *old_conf, i32 reload_cfg(struct tbuf *out) { - static struct tnt_latch *latch = NULL; + static struct mutex *mutex = NULL; struct tarantool_cfg new_cfg, aux_cfg; - if (latch == NULL) { - latch = palloc(eter_pool, sizeof(*latch)); - tnt_latch_create(latch); + if (mutex == NULL) { + mutex = palloc(eter_pool, sizeof(*mutex)); + mutex_create(mutex); } - if (tnt_latch_trylock(latch) == -1) { + if (mutex_trylock(mutex) == true) { out_warning(0, "Could not reload configuration: it is being reloaded right now"); tbuf_append(out, cfg_out->data, cfg_out->size); @@ -276,9 +276,8 @@ reload_cfg(struct tbuf *out) if (cfg_out->size != 0) tbuf_append(out, cfg_out->data, cfg_out->size); - tnt_latch_unlock(latch); + mutex_unlock(mutex); } - return 0; } -- GitLab