Skip to content
Snippets Groups Projects
Commit c9ef7471 authored by Roman Tsisyk's avatar Roman Tsisyk
Browse files

Add coio_wait() to public C API

Extract bsdsocket_io_wait() from bsdsocket.cc to public C API.
This very simple patch allow to use Tarantool event machine in
stored C procedures.

Other changes: replace fiber_wakeup() with fiber_call() here to save extra
               event loop iteration.
parent 008c20c2
No related branches found
No related tags found
No related merge requests found
......@@ -705,3 +705,42 @@ coio_waitpid(pid_t pid)
return status;
}
/* Values of COIO_READ(WRITE) must equal to EV_READ(WRITE) */
static_assert(COIO_READ == (int) EV_READ, "TNT_IO_READ");
static_assert(COIO_WRITE == (int) EV_WRITE, "TNT_IO_WRITE");
struct coio_wdata {
struct fiber *fiber;
int revents;
};
static void
coio_wait_cb(struct ev_loop *loop, ev_io *watcher, int revents)
{
(void) loop;
struct coio_wdata *wdata = (struct coio_wdata *) watcher->data;
wdata->revents = revents;
fiber_call(wdata->fiber);
}
int
coio_wait(int fd, int events, double timeout)
{
struct ev_io io;
coio_init(&io, fd);
ev_io_init(&io, coio_wait_cb, fd, events);
struct coio_wdata wdata = {
/* .fiber = */ fiber(),
/* .revents = */ 0
};
io.data = &wdata;
/* A special hack to work with zero timeout */
ev_set_priority(&io, EV_MAXPRI);
ev_io_start(loop(), &io);
fiber_yield_timeout(timeout);
ev_io_stop(loop(), &io);
return wdata.revents;
}
......@@ -32,6 +32,7 @@
*/
#include "evio.h"
#include "fiber.h"
#include "trivia/util.h"
/**
* Co-operative I/O
......@@ -183,5 +184,28 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay);
int
coio_waitpid(pid_t pid);
/** \cond public */
enum {
/** READ event */
COIO_READ = 0x1,
/** WRITE event */
COIO_WRITE = 0x2,
};
/**
* Wait until READ or WRITE event on socket (\a fd). Yields.
* \param fd - non-blocking socket file description
* \param events - requested events to wait.
* Combination of TNT_IO_READ | TNT_IO_WRITE bit flags.
* \param timeoout - timeout in seconds.
* \retval 0 - timeout
* \retval >0 - returned events. Combination of TNT_IO_READ | TNT_IO_WRITE
* bit flags.
*/
API_EXPORT int
coio_wait(int fd, int event, double timeout);
/** \endcond public */
#endif /* TARANTOOL_COIO_H_INCLUDED */
......@@ -49,6 +49,7 @@ extern "C" {
#include <lualib.h>
}
#include <coio.h>
#include <coeio.h>
#include <fiber.h>
#include <scoped_guard.h>
......@@ -379,21 +380,6 @@ bsdsocket_nonblock(int fh, int mode)
return mode ? 1 : 0;
}
struct bsdsocket_io_wdata {
struct fiber *fiber;
int io;
};
static void
bsdsocket_io(struct ev_loop *loop, ev_io *watcher, int revents)
{
(void) loop;
struct bsdsocket_io_wdata *wdata =
(struct bsdsocket_io_wdata *)watcher->data;
wdata->io = revents;
fiber_wakeup(wdata->fiber);
}
static int
lbox_bsdsocket_iowait(struct lua_State *L)
{
......@@ -401,35 +387,7 @@ lbox_bsdsocket_iowait(struct lua_State *L)
int events = lua_tointeger(L, 2);
ev_tstamp timeout = lua_tonumber(L, 3);
switch (events) {
case 0:
events = EV_READ;
break;
case 1:
events = EV_WRITE;
break;
case 2:
events = EV_READ | EV_WRITE;
break;
default:
assert(false);
}
struct ev_io io;
ev_io_init(&io, bsdsocket_io, fh, events);
struct bsdsocket_io_wdata wdata = { fiber(), 0 };
io.data = &wdata;
ev_set_priority(&io, EV_MAXPRI);
ev_io_start(loop(), &io);
fiber_yield_timeout(timeout);
ev_io_stop(loop(), &io);
int ret = 0;
if (wdata.io & EV_READ)
ret |= 1;
if (wdata.io & EV_WRITE)
ret |= 2;
int ret = coio_wait(fh, events, timeout);
lua_pushinteger(L, ret);
return 1;
......
......@@ -270,20 +270,20 @@ local function wait_safely(self, what, timeout)
self.waiters[fid] = true
local res = internal.iowait(fd, what, timeout)
self.waiters[fid] = nil
fiber.testcancel()
if res == 0 then
self._errno = boxerrno.ETIMEDOUT
return 0
end
fiber.testcancel()
return res
end
socket_methods.readable = function(self, timeout)
return wait_safely(self, 0, timeout) ~= 0
return wait_safely(self, 1, timeout) ~= 0
end
socket_methods.wait = function(self, timeout)
local wres = wait_safely(self, 2, timeout)
local wres = wait_safely(self, 3, timeout)
local res = ''
if bit.band(wres, 1) ~= 0 then
res = res .. 'R'
......@@ -295,7 +295,7 @@ socket_methods.wait = function(self, timeout)
end
socket_methods.writable = function(self, timeout)
return wait_safely(self, 1, timeout) ~= 0
return wait_safely(self, 2, timeout) ~= 0
end
socket_methods.listen = function(self, backlog)
......
set(api_headers
${CMAKE_CURRENT_BINARY_DIR}/config.h
${CMAKE_SOURCE_DIR}/src/say.h
${CMAKE_SOURCE_DIR}/src/coio.h
${CMAKE_SOURCE_DIR}/src/coeio.h
${CMAKE_SOURCE_DIR}/src/lua/utils.h
${CMAKE_SOURCE_DIR}/src/box/txn.h
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment