Skip to content
Snippets Groups Projects
Commit 95688bb4 authored by Roman Tsisyk's avatar Roman Tsisyk Committed by Roman Tsisyk
Browse files

Extract ipc_cond into a separate file

No semantic changes.

Needed for #1451
parent d9aacede
No related branches found
No related tags found
No related merge requests found
......@@ -116,6 +116,7 @@ set (server_sources
coio_buf.cc
pickle.c
ipc.c
fiber_cond.c
latch.c
errinj.c
fio.c
......@@ -136,6 +137,7 @@ set (server_sources
lua/digest.c
lua/init.c
lua/fiber.c
lua/fiber_cond.c
lua/trigger.c
lua/ipc.c
lua/msgpack.c
......
......@@ -36,7 +36,7 @@
#include <small/rlist.h>
#include "ipc.h"
#include "fiber_cond.h"
#include "iterator_type.h"
#include "vy_stmt.h" /* for comparators */
#include "vy_stmt_iterator.h" /* struct vy_stmt_iterator */
......
......@@ -33,8 +33,8 @@
#include <zstd.h>
#include "fiber.h"
#include "fiber_cond.h"
#include "fio.h"
#include "ipc.h"
#include "cbus.h"
#include "memory.h"
......
......@@ -34,7 +34,7 @@
#include <stdint.h>
#include <stdbool.h>
#include "ipc.h"
#include "fiber_cond.h"
#include "iterator_type.h"
#include "vy_stmt.h" /* for comparators */
#include "vy_stmt_iterator.h" /* struct vy_stmt_iterator */
......
......@@ -31,8 +31,8 @@
* SUCH DAMAGE.
*/
#include "fiber.h"
#include "fiber_cond.h"
#include "rmean.h"
#include "ipc.h"
#include "small/rlist.h"
#include "salad/stailq.h"
......
/*
* Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "fiber_cond.h"
#include <tarantool_ev.h>
#include "fiber.h"
void
ipc_cond_create(struct ipc_cond *c)
{
rlist_create(&c->waiters);
}
void
ipc_cond_destroy(struct ipc_cond *c)
{
(void)c;
assert(rlist_empty(&c->waiters));
}
void
ipc_cond_signal(struct ipc_cond *e)
{
if (! rlist_empty(&e->waiters)) {
struct fiber *f;
f = rlist_shift_entry(&e->waiters, struct fiber, state);
fiber_wakeup(f);
}
}
void
ipc_cond_broadcast(struct ipc_cond *e)
{
while (! rlist_empty(&e->waiters)) {
struct fiber *f;
f = rlist_shift_entry(&e->waiters, struct fiber, state);
fiber_wakeup(f);
}
}
int
ipc_cond_wait_timeout(struct ipc_cond *c, double timeout)
{
struct fiber *f = fiber();
rlist_add_tail_entry(&c->waiters, f, state);
if (fiber_yield_timeout(timeout)) {
diag_set(TimedOut);
return -1;
}
return 0;
}
int
ipc_cond_wait(struct ipc_cond *c)
{
return ipc_cond_wait_timeout(c, TIMEOUT_INFINITY);
}
#ifndef TARANTOOL_FIBER_COND_H_INCLUDED
#define TARANTOOL_FIBER_COND_H_INCLUDED 1
/*
* Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <small/rlist.h>
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
struct ipc_cond {
struct rlist waiters;
};
/**
* Initialize a cond - semantics as in POSIX condition variable.
*/
void
ipc_cond_create(struct ipc_cond *c);
/**
* Finalize a cond. UB if there are fibers waiting for a cond.
*/
void
ipc_cond_destroy(struct ipc_cond *c);
/**
* Wake one fiber waiting for the cond.
* Does nothing if no one is waiting.
*/
void
ipc_cond_signal(struct ipc_cond *c);
/**
* Wake all fibers waiting for the cond.
*/
void
ipc_cond_broadcast(struct ipc_cond *c);
int
ipc_cond_wait_timeout(struct ipc_cond *c, double timeout);
int
ipc_cond_wait(struct ipc_cond *c);
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
#endif /* TARANTOOL_FIBER_COND_H_INCLUDED */
......@@ -33,9 +33,10 @@
#include <small/ibuf.h>
#include <small/region.h>
#include <small/mempool.h>
#include <tarantool_ev.h>
#include "diag.h"
#include "ipc.h"
#include "fiber_cond.h"
/** {{{ Environment */
......
......@@ -483,47 +483,4 @@ ipc_channel_get_msg_timeout(struct ipc_channel *ch,
}
}
void
ipc_cond_create(struct ipc_cond *c)
{
rlist_create(&c->waiters);
}
void
ipc_cond_destroy(struct ipc_cond *c)
{
(void)c;
assert(rlist_empty(&c->waiters));
}
void
ipc_cond_signal(struct ipc_cond *e)
{
if (! rlist_empty(&e->waiters)) {
struct fiber *f;
f = rlist_shift_entry(&e->waiters, struct fiber, state);
fiber_wakeup(f);
}
}
void
ipc_cond_broadcast(struct ipc_cond *e)
{
while (! rlist_empty(&e->waiters)) {
struct fiber *f;
f = rlist_shift_entry(&e->waiters, struct fiber, state);
fiber_wakeup(f);
}
}
int
ipc_cond_wait_timeout(struct ipc_cond *c, ev_tstamp timeout)
{
struct fiber *f = fiber();
rlist_add_tail_entry(&c->waiters, f, state);
if (fiber_yield_timeout(timeout)) {
diag_set(TimedOut);
return -1;
}
return 0;
}
......@@ -367,43 +367,6 @@ ipc_channel_is_closed(struct ipc_channel *ch)
return ch->is_closed;
}
struct ipc_cond {
struct rlist waiters;
};
/**
* Initialize a cond - semantics as in POSIX condition variable.
*/
void
ipc_cond_create(struct ipc_cond *c);
/**
* Finalize a cond. UB if there are fibers waiting for a cond.
*/
void
ipc_cond_destroy(struct ipc_cond *c);
/**
* Wake one fiber waiting for the cond.
* Does nothing if no one is waiting.
*/
void
ipc_cond_signal(struct ipc_cond *c);
/**
* Wake all fibers waiting for the cond.
*/
void
ipc_cond_broadcast(struct ipc_cond *c);
int
ipc_cond_wait_timeout(struct ipc_cond *c, ev_tstamp timeout);
static inline int
ipc_cond_wait(struct ipc_cond *c)
{
return ipc_cond_wait_timeout(c, TIMEOUT_INFINITY);
}
#if defined(__cplusplus)
} /* extern "C" */
......
/*
* Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "lua/fiber_cond.h"
#include "third_party/tarantool_ev.h"
#include "lua/utils.h"
#include "fiber.h"
#include <fiber_cond.h>
static const char cond_typename[] = "fiber.cond";
static int
lbox_ipc_cond(struct lua_State *L)
{
struct ipc_cond *e = lua_newuserdata(L, sizeof(*e));
if (e == NULL)
luaL_error(L, "fiber.cond: not enough memory");
ipc_cond_create(e);
luaL_getmetatable(L, cond_typename);
lua_setmetatable(L, -2);
return 1;
}
static inline struct ipc_cond *
lbox_check_cond(struct lua_State *L, int index, const char *source)
{
if (index > lua_gettop(L))
luaL_error(L, "usage: %s", source);
return (struct ipc_cond *)luaL_checkudata(L, index, cond_typename);
}
static int
lbox_ipc_cond_gc(struct lua_State *L)
{
ipc_cond_destroy(lbox_check_cond(L, 1, "cond:destroy()"));
return 0;
}
static int
lbox_ipc_cond_signal(struct lua_State *L)
{
ipc_cond_signal(lbox_check_cond(L, 1, "cond:signal()"));
return 0;
}
static int
lbox_ipc_cond_broadcast(struct lua_State *L)
{
ipc_cond_broadcast(lbox_check_cond(L, 1, "cond:broadcast()"));
return 0;
}
static int
lbox_ipc_cond_wait(struct lua_State *L)
{
static const char usage[] = "cond:wait([timeout])";
int rc;
struct ipc_cond *e = lbox_check_cond(L, 1, usage);
ev_tstamp timeout = TIMEOUT_INFINITY;
if (!lua_isnoneornil(L, 2)) {
if (!lua_isnumber(L, 2) ||
(timeout = lua_tonumber(L, 2)) < .0) {
luaL_error(L, "usage: %s", usage);
}
}
rc = ipc_cond_wait_timeout(e, timeout);
if (rc != 0)
luaL_testcancel(L);
lua_pushboolean(L, rc == 0);
return 1;
}
static int
lbox_ipc_cond_to_string(struct lua_State *L)
{
struct ipc_cond *cond = lbox_check_cond(L, 1, "");
(void)cond;
lua_pushstring(L, "cond");
return 1;
}
void
tarantool_lua_fiber_cond_init(struct lua_State *L)
{
static const struct luaL_Reg cond_meta[] = {
{"__gc", lbox_ipc_cond_gc},
{"__tostring", lbox_ipc_cond_to_string},
{"signal", lbox_ipc_cond_signal},
{"broadcast", lbox_ipc_cond_broadcast},
{"wait", lbox_ipc_cond_wait},
{NULL, NULL}
};
luaL_register_type(L, cond_typename, cond_meta);
static const struct luaL_Reg cond_lib[] = {
{"cond", lbox_ipc_cond},
{NULL, NULL}
};
luaL_register_module(L, "fiber", cond_lib);
lua_pop(L, 1);
}
#ifndef TARANTOOL_LUA_FIBER_COND_H_INCLUDED
#define TARANTOOL_LUA_FIBER_COND_H_INCLUDED 1
/*
* Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
struct lua_State;
void tarantool_lua_fiber_cond_init(struct lua_State *L);
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
#endif /* TARANTOOL_LUA_FIBER_COND_H_INCLUDED */
......@@ -45,6 +45,7 @@
#include "version.h"
#include "coio.h"
#include "lua/fiber.h"
#include "lua/fiber_cond.h"
#include "lua/ipc.h"
#include "lua/errno.h"
#include "lua/socket.h"
......@@ -380,6 +381,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
tarantool_lua_utils_init(L);
tarantool_lua_fiber_init(L);
tarantool_lua_fiber_cond_init(L);
tarantool_lua_ipc_init(L);
tarantool_lua_errno_init(L);
tarantool_lua_fio_init(L);
......
......@@ -47,7 +47,6 @@ luaL_error(lua_State *L, const char *fmt, ...);
#include <fiber.h>
static const char channel_typename[] = "fiber.channel";
static const char cond_typename[] = "fiber.cond";
/******************** channel ***************************/
......@@ -272,76 +271,6 @@ lbox_ipc_channel_to_string(struct lua_State *L)
return 1;
}
static int
lbox_ipc_cond(struct lua_State *L)
{
struct ipc_cond *e = lua_newuserdata(L, sizeof(*e));
if (e == NULL)
luaL_error(L, "fiber.cond: not enough memory");
ipc_cond_create(e);
luaL_getmetatable(L, cond_typename);
lua_setmetatable(L, -2);
return 1;
}
static inline struct ipc_cond *
lbox_check_cond(struct lua_State *L, int index, const char *source)
{
if (index > lua_gettop(L))
luaL_error(L, "usage: %s", source);
return (struct ipc_cond *)luaL_checkudata(L, index, cond_typename);
}
static int
lbox_ipc_cond_gc(struct lua_State *L)
{
ipc_cond_destroy(lbox_check_cond(L, 1, "cond:destroy()"));
return 0;
}
static int
lbox_ipc_cond_signal(struct lua_State *L)
{
ipc_cond_signal(lbox_check_cond(L, 1, "cond:signal()"));
return 0;
}
static int
lbox_ipc_cond_broadcast(struct lua_State *L)
{
ipc_cond_broadcast(lbox_check_cond(L, 1, "cond:broadcast()"));
return 0;
}
static int
lbox_ipc_cond_wait(struct lua_State *L)
{
static const char usage[] = "cond:wait([timeout])";
int rc;
struct ipc_cond *e = lbox_check_cond(L, 1, usage);
ev_tstamp timeout = TIMEOUT_INFINITY;
if (!lua_isnoneornil(L, 2)) {
if (!lua_isnumber(L, 2) ||
(timeout = lua_tonumber(L, 2)) < .0) {
luaL_error(L, "usage: %s", usage);
}
}
rc = ipc_cond_wait_timeout(e, timeout);
if (rc != 0)
luaL_testcancel(L);
lua_pushboolean(L, rc == 0);
return 1;
}
static int
lbox_ipc_cond_to_string(struct lua_State *L)
{
struct ipc_cond *cond = lbox_check_cond(L, 1, "");
(void)cond;
lua_pushstring(L, "cond");
return 1;
}
void
tarantool_lua_ipc_init(struct lua_State *L)
{
......@@ -362,19 +291,8 @@ tarantool_lua_ipc_init(struct lua_State *L)
};
luaL_register_type(L, channel_typename, channel_meta);
static const struct luaL_Reg cond_meta[] = {
{"__gc", lbox_ipc_cond_gc},
{"__tostring", lbox_ipc_cond_to_string},
{"signal", lbox_ipc_cond_signal},
{"broadcast", lbox_ipc_cond_broadcast},
{"wait", lbox_ipc_cond_wait},
{NULL, NULL}
};
luaL_register_type(L, cond_typename, cond_meta);
static const struct luaL_Reg ipc_lib[] = {
{"channel", lbox_ipc_channel},
{"cond", lbox_ipc_cond},
{NULL, NULL}
};
......
fiber = require('fiber')
---
...
-- fiber.cond
c = fiber.cond()
---
...
tostring(c)
---
- cond
...
-- args validation
c.wait()
---
- error: 'usage: cond:wait([timeout])'
...
c.wait('1')
---
- error: 'bad argument #1 to ''?'' (fiber.cond expected, got string)'
...
c:wait('1')
---
- false
...
c:wait(-1)
---
- error: 'usage: cond:wait([timeout])'
...
-- timeout
c:wait(0.1)
---
- false
...
-- wait success
fiber.create(function() fiber.sleep(.5); c:broadcast() end) and c:wait(.6)
---
- true
...
-- signal
t = {}
---
...
for i = 1,4 do fiber.create(function() c:wait(); table.insert(t, '#') end) end
---
...
c:signal()
---
...
fiber.sleep(0.1)
---
...
t
---
- - '#'
...
-- broadcast
c:broadcast()
---
...
fiber.sleep(0.1)
---
...
t
---
- - '#'
- '#'
- '#'
- '#'
...
fiber = require('fiber')
-- fiber.cond
c = fiber.cond()
tostring(c)
-- args validation
c.wait()
c.wait('1')
c:wait('1')
c:wait(-1)
-- timeout
c:wait(0.1)
-- wait success
fiber.create(function() fiber.sleep(.5); c:broadcast() end) and c:wait(.6)
-- signal
t = {}
for i = 1,4 do fiber.create(function() c:wait(); table.insert(t, '#') end) end
c:signal()
fiber.sleep(0.1)
t
-- broadcast
c:broadcast()
fiber.sleep(0.1)
t
......@@ -578,69 +578,3 @@ refs -- must be zero
---
- 0
...
-- fiber.cond
c = fiber.cond()
---
...
tostring(c)
---
- cond
...
-- args validation
c.wait()
---
- error: 'usage: cond:wait([timeout])'
...
c.wait('1')
---
- error: 'bad argument #1 to ''?'' (fiber.cond expected, got string)'
...
c:wait('1')
---
- false
...
c:wait(-1)
---
- error: 'usage: cond:wait([timeout])'
...
-- timeout
c:wait(0.1)
---
- false
...
-- wait success
fiber.create(function() fiber.sleep(.5); c:broadcast() end) and c:wait(.6)
---
- true
...
-- signal
t = {}
---
...
for i = 1,4 do fiber.create(function() c:wait(); table.insert(t, '#') end) end
---
...
c:signal()
---
...
fiber.sleep(0.1)
---
...
t
---
- - '#'
...
-- broadcast
c:broadcast()
---
...
fiber.sleep(0.1)
---
...
t
---
- - '#'
- '#'
- '#'
- '#'
...
......@@ -210,26 +210,3 @@ refs
ch:close()
collectgarbage('collect')
refs -- must be zero
-- fiber.cond
c = fiber.cond()
tostring(c)
-- args validation
c.wait()
c.wait('1')
c:wait('1')
c:wait(-1)
-- timeout
c:wait(0.1)
-- wait success
fiber.create(function() fiber.sleep(.5); c:broadcast() end) and c:wait(.6)
-- signal
t = {}
for i = 1,4 do fiber.create(function() c:wait(); table.insert(t, '#') end) end
c:signal()
fiber.sleep(0.1)
t
-- broadcast
c:broadcast()
fiber.sleep(0.1)
t
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment