-
Vladislav Shpilevoy authored
Checks for fiber_name legth worked with FIBER_NAME_MAX, while one byte in fact was occupied by terminating zero. Closes #4011
Vladislav Shpilevoy authoredChecks for fiber_name legth worked with FIBER_NAME_MAX, while one byte in fact was occupied by terminating zero. Closes #4011
fiber.h 15.49 KiB
#ifndef TARANTOOL_FIBER_H_INCLUDED
#define TARANTOOL_FIBER_H_INCLUDED
/*
* Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "trivia/config.h"
#include <stdbool.h>
#include <stdint.h>
#include <unistd.h>
#include "tt_pthread.h"
#include "third_party/tarantool_ev.h"
#include "diag.h"
#include "trivia/util.h"
#include "small/mempool.h"
#include "small/region.h"
#include "small/rlist.h"
#include "salad/stailq.h"
#include <third_party/coro/coro.h>
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
enum { FIBER_NAME_MAX = 32 };
enum {
/**
* It's safe to resume (wakeup) this fiber
* with a spurious wakeup if it is suspended,
* e.g. to force it to check that it's been
* cancelled.
*/
FIBER_IS_CANCELLABLE = 1 << 0,
/**
* Indicates that a fiber has been requested to end
* prematurely.
*/
FIBER_IS_CANCELLED = 1 << 1,
/**
* The fiber will garbage collect automatically
* when fiber function ends. The alternative
* is that some other fiber will wait for
* the end of this fiber and garbage collect it
* with fiber_join().
*/
FIBER_IS_JOINABLE = 1 << 2,
/**
* The fiber is in cord->ready list or in
* a call chain created by fiber_schedule_list().
* The flag is set to help fiber_wakeup() avoid
* double wakeup of an already scheduled fiber.
*/
FIBER_IS_READY = 1 << 3,
/**
* This flag is set when fiber function ends and before
* the fiber is recycled.
*/
FIBER_IS_DEAD = 1 << 4,
/**
* This flag is set when fiber uses custom stack size.
*/
FIBER_CUSTOM_STACK = 1 << 5,
FIBER_DEFAULT_FLAGS = FIBER_IS_CANCELLABLE
};
/** \cond public */
/**
* Fiber attributes container
*/
struct fiber_attr;
/**
* Create a new fiber attribute container and initialize it
* with default parameters.
* Can be used for many fibers creation, corresponding fibers
* will not take ownership.
*/
API_EXPORT struct fiber_attr *
fiber_attr_new();
/**
* Delete the fiber_attr and free all allocated resources.
* This is safe when fibers created with this attribute still exist.
*
*\param fiber_attr fiber attribute
*/
API_EXPORT void
fiber_attr_delete(struct fiber_attr *fiber_attr);
/**
* Set stack size for the fiber attribute.
*
* \param fiber_attribute fiber attribute container
* \param stacksize stack size for new fibers
*/
API_EXPORT int
fiber_attr_setstacksize(struct fiber_attr *fiber_attr, size_t stack_size);
/**
* Get stack size from the fiber attribute.
*
* \param fiber_attribute fiber attribute container or NULL for default
* \retval stack size
*/
API_EXPORT size_t
fiber_attr_getstacksize(struct fiber_attr *fiber_attr);
struct fiber;
/**
* Fiber - contains information about fiber
*/
typedef int (*fiber_func)(va_list);
/**
* Return the current fiber
*/
API_EXPORT struct fiber *
fiber_self();
/**
* Create a new fiber.
*
* Takes a fiber from fiber cache, if it's not empty.
* Can fail only if there is not enough memory for
* the fiber structure or fiber stack.
*
* The created fiber automatically returns itself
* to the fiber cache when its "main" function
* completes.
*
* \param name string with fiber name
* \param fiber_func func for run inside fiber
*
* \sa fiber_start
*/
API_EXPORT struct fiber *
fiber_new(const char *name, fiber_func f);
/**
* Create a new fiber with defined attributes.
*
* Can fail only if there is not enough memory for
* the fiber structure or fiber stack.
*
* The created fiber automatically returns itself
* to the fiber cache if has default stack size
* when its "main" function completes.
*
* \param name string with fiber name
* \param fiber_attr fiber attributes
* \param fiber_func func for run inside fiber
*
* \sa fiber_start
*/
API_EXPORT struct fiber *
fiber_new_ex(const char *name, const struct fiber_attr *fiber_attr, fiber_func f);
/**
* Return control to another fiber and wait until it'll be woken.
*
* \sa fiber_wakeup
*/
API_EXPORT void
fiber_yield(void);
/**
* Start execution of created fiber.
*
* \param callee fiber to start
* \param ... arguments to start the fiber with
*
* \sa fiber_new
*/
API_EXPORT void
fiber_start(struct fiber *callee, ...);
/**
* Interrupt a synchronous wait of a fiber
*
* \param f fiber to be woken up
*/
API_EXPORT void
fiber_wakeup(struct fiber *f);
/**
* Cancel the subject fiber. (set FIBER_IS_CANCELLED flag)
*
* If target fiber's flag FIBER_IS_CANCELLABLE set, then it would
* be woken up (maybe prematurely). Then current fiber yields
* until the target fiber is dead (or is woken up by
* \sa fiber_wakeup).
*
* \param f fiber to be cancelled
*/
API_EXPORT void
fiber_cancel(struct fiber *f);
/**
* Make it possible or not possible to wakeup the current
* fiber immediately when it's cancelled.
*
* @param yesno status to set
* @return previous state.
*/
API_EXPORT bool
fiber_set_cancellable(bool yesno);
/**
* Set fiber to be joinable (false by default).
* \param yesno status to set
*/
API_EXPORT void
fiber_set_joinable(struct fiber *fiber, bool yesno);
/**
* Wait until the fiber is dead and then move its execution
* status to the caller.
* The fiber must not be detached (@sa fiber_set_joinable()).
* @pre FIBER_IS_JOINABLE flag is set.
*
* \param f fiber to be woken up
* \return fiber function ret code
*/
API_EXPORT int
fiber_join(struct fiber *f);
/**
* Put the current fiber to sleep for at least 's' seconds.
*
* \param s time to sleep
*
* \note this is a cancellation point (\sa fiber_is_cancelled)
*/
API_EXPORT void
fiber_sleep(double s);
/**
* Check current fiber for cancellation (it must be checked
* manually).
*/
API_EXPORT bool
fiber_is_cancelled();
/**
* Report loop begin time as double (cheap).
* Uses real time clock.
*/
API_EXPORT double
fiber_time(void);
/**
* Report loop begin time as 64-bit int.
* Uses real time clock.
*/
API_EXPORT uint64_t
fiber_time64(void);
/**
* Report loop begin time as double (cheap).
* Uses monotonic clock.
*/
API_EXPORT double
fiber_clock(void);
/**
* Report loop begin time as 64-bit int.
* Uses monotonic clock.
*/
API_EXPORT uint64_t
fiber_clock64(void);
/**
* Reschedule fiber to end of event loop cycle.
*/
API_EXPORT void
fiber_reschedule(void);
/**
* Return slab_cache suitable to use with tarantool/small library
*/
struct slab_cache;
API_EXPORT struct slab_cache *
cord_slab_cache(void);
/** \endcond public */
/**
* Fiber attribute container
*/
struct fiber_attr {
/** Fiber stack size. */
size_t stack_size;
/** Fiber flags. */
uint32_t flags;
};
/**
* Init fiber attr with default values
*/
void
fiber_attr_create(struct fiber_attr *fiber_attr);
/**
* Under no circumstances this header file is allowed to include
* application-specific headers like session.h or txn.h. One only
* is allowed to announce a struct and add opaque pointer to it.
*/
struct session;
struct txn;
struct credentials;
struct lua_State;
struct ipc_wait_pad;
struct fiber {
coro_context ctx;
/** Coro stack slab. */
struct slab *stack_slab;
/** Coro stack addr. */
void *stack;
/** Coro stack size. */
size_t stack_size;
/** Valgrind stack id. */
unsigned int stack_id;
/* A garbage-collected memory pool. */
struct region gc;
/**
* The fiber which should be scheduled when
* this fiber yields.
*/
struct fiber *caller;
/** Number of context switches. */
int csw;
/** Fiber id. */
uint32_t fid;
/** Fiber flags */
uint32_t flags;
/** Link in cord->alive or cord->dead list. */
struct rlist link;
/** Link in cord->ready list. */
struct rlist state;
/** Triggers invoked before this fiber yields. Must not throw. */
struct rlist on_yield;
/** Triggers invoked before this fiber stops. Must not throw. */
struct rlist on_stop;
/**
* The list of fibers awaiting for this fiber's timely
* (or untimely) death.
*/
struct rlist wake;
/**
* This struct is considered as non-POD when compiling by g++.
* You can safely ignore all offset_of-related warnings.
* See http://gcc.gnu.org/bugzilla/show_bug.cgi?id=31488
*/
fiber_func f;
va_list f_data;
int f_ret;
/** Fiber local storage. */
struct {
/**
* Current transaction, session and the active
* user credentials are shared among multiple
* requests and valid even out of a former.
*/
struct session *session;
struct credentials *credentials;
struct txn *txn;
/**
* Lua stack and the optional
* fiber.storage Lua reference.
*/
struct {
struct lua_State *stack;
int ref;
} lua;
/**
* Iproto sync.
*/
struct {
uint64_t sync;
} net;
} storage;
/** An object to wait for incoming message or a reader. */
struct ipc_wait_pad *wait_pad;
/** Exception which caused this fiber's death. */
struct diag diag;
char name[FIBER_NAME_MAX + 1];
};
enum { FIBER_CALL_STACK = 16 };
struct cord_on_exit;
/**
* @brief An independent execution unit that can be managed by a separate OS
* thread. Each cord consists of fibers to implement cooperative multitasking
* model.
*/
struct cord {
/** The fiber that is currently being executed. */
struct fiber *fiber;
struct ev_loop *loop;
/**
* Every new fiber gets a new monotonic id. Ids 1-100 are
* reserved.
*/
uint32_t max_fid;
pthread_t id;
const struct cord_on_exit *on_exit;
/** A helper hash to map id -> fiber. */
struct mh_i32ptr_t *fiber_registry;
/** All fibers */
struct rlist alive;
/** Fibers, ready for execution */
struct rlist ready;
/** A cache of dead fibers for reuse */
struct rlist dead;
/** A watcher to have a single async event for all ready fibers.
* This technique is necessary to be able to suspend
* a single fiber on a few watchers (for example,
* a timeout and an event from network, whichever comes
* first).
* */
ev_async wakeup_event;
/**
* libev sleeps at least backend_mintime, which is 1 ms in
* case of poll()/Linux, unless there are idle watchers.
* This is a special hack to speed up fiber_sleep(0),
* i.e. a sleep with a zero timeout, to ensure that there
* is no 1 ms delay in case of zero sleep timeout.
*/
ev_idle idle_event;
/** A memory cache for (struct fiber) */
struct mempool fiber_mempool;
/** A runtime slab cache for general use in this cord. */
struct slab_cache slabc;
/** The "main" fiber of this cord, the scheduler. */
struct fiber sched;
char name[FIBER_NAME_MAX];
};
extern __thread struct cord *cord_ptr;
#define cord() cord_ptr
#define fiber() cord()->fiber
#define loop() (cord()->loop)
void
cord_create(struct cord *cord, const char *name);
void
cord_destroy(struct cord *cord);
/**
* Start a cord with the given thread function.
* The return value of the function can be collected
* with cord_join(). The function *must catch* all
* exceptions and leave them in the diagnostics
* area, cord_join() moves the exception from the
* terminated cord to the caller of cord_join().
*/
int
cord_start(struct cord *cord, const char *name,
void *(*f)(void *), void *arg);
/**
* Like cord_start(), but starts the event loop and
* a fiber in the event loop. The event loop ends when the
* fiber in main fiber dies/returns. The exception of the main
* fiber is propagated to cord_cojoin().
*/
int
cord_costart(struct cord *cord, const char *name, fiber_func f, void *arg);
/**
* Yield until \a cord has terminated.
*
* On success:
*
* If \a cord has terminated with an uncaught exception
* the exception is moved to the current fiber's diagnostics
* area, otherwise the current fiber's diagnostics area is
* cleared.
* @param cord cord
* @sa pthread_join()
*
* @return 0 on success, -1 if pthread_join failed or the
* thread function terminated with an exception.
*/
int
cord_cojoin(struct cord *cord);
/**
* Wait for \a cord to terminate. If \a cord has already
* terminated, then returns immediately.
*
* @post If the subject cord terminated with an exception,
* preserves the exception in the caller's cord.
*
* @param cord cord
* @return 0 on success, -1 if pthread_join failed or the
* thread function terminated with an exception.
*/
int
cord_join(struct cord *cord);
void
cord_set_name(const char *name);
static inline const char *
cord_name(struct cord *cord)
{
return cord->name;
}
/** True if this cord represents the process main thread. */
bool
cord_is_main();
void
fiber_init(int (*fiber_invoke)(fiber_func f, va_list ap));
void
fiber_free(void);
/**
* Set fiber name.
* @param fiber Fiber to set name for.
* @param name A new name of @a fiber.
*/
void
fiber_set_name(struct fiber *fiber, const char *name);
static inline const char *
fiber_name(struct fiber *f)
{
return f->name;
}
bool
fiber_checkstack();
/**
* @brief yield & check for timeout
* @return true if timeout exceeded
*/
bool
fiber_yield_timeout(ev_tstamp delay);
void
fiber_destroy_all();
void
fiber_gc(void);
void
fiber_call(struct fiber *callee);
struct fiber *
fiber_find(uint32_t fid);
void
fiber_schedule_cb(ev_loop * /* loop */, ev_watcher *watcher, int revents);
static inline bool
fiber_is_dead(struct fiber *f)
{
return f->flags & FIBER_IS_DEAD;
}
typedef int (*fiber_stat_cb)(struct fiber *f, void *ctx);
int
fiber_stat(fiber_stat_cb cb, void *cb_ctx);
/** Useful for C unit tests */
static inline int
fiber_c_invoke(fiber_func f, va_list ap)
{
return f(ap);
}
#if defined(__cplusplus)
} /* extern "C" */
/*
* Test if this fiber is in a cancellable state and was indeed
* cancelled, and raise an exception (FiberIsCancelled) if
* that's the case.
*/
static inline void
fiber_testcancel(void)
{
/*
* Fiber can catch FiberIsCancelled using try..catch
* block in C or pcall()/xpcall() in Lua. However,
* FIBER_IS_CANCELLED flag is still set and the subject
* fiber will be killed by subsequent unprotected call of
* this function.
*/
if (fiber_is_cancelled())
tnt_raise(FiberIsCancelled);
}
static inline struct fiber *
fiber_new_xc(const char *name, fiber_func func)
{
struct fiber *f = fiber_new(name, func);
if (f == NULL) {
diag_raise();
unreachable();
}
return f;
}
static inline int
fiber_cxx_invoke(fiber_func f, va_list ap)
{
try {
return f(ap);
} catch (struct error *e) {
return -1;
}
}
#endif /* defined(__cplusplus) */
static inline void *
region_aligned_alloc_cb(void *ctx, size_t size)
{
void *ptr = region_aligned_alloc((struct region *) ctx, size,
alignof(uint64_t));
if (ptr == NULL)
diag_set(OutOfMemory, size, "region", "new slab");
return ptr;
}
#endif /* TARANTOOL_FIBER_H_INCLUDED */