Skip to content
Snippets Groups Projects
Commit d653d088 authored by godzie44's avatar godzie44 Committed by Dmitry Ivanov
Browse files

cbus: introduce lcpipe - light cpipe

Introduced a new type of cbus pipe - lcpipe. The current pipe in the
cbus - cpipe, has a number of limitations, first of all - the cpipe
cannot be used from the 3rd party threads, cpipe only works as a channel
between two cords. That why lcpipe is needed. Its main responsibility -
create channel between any thread and tarantool cord.

Internally lcpipe is a cpipe, but:
- on flush triggers removed, cause triggers use thread-local mem-pool,
this is not possible on a third party thread
- producer event loop removed, cause there is no libev event loop in
third party thread

Also, lcpipe interface is exported to the outside world.

NO_DOC=core feature
parent 7900c396
No related branches found
No related tags found
No related merge requests found
## feature/build
* export cbus interface:
* cbus_endpoint_new
* cbus_endpoint_delete
* cbus_loop
* cbus_process
* add new type of cbus pipe: lcpipe. Lcpipe main purpose - send messages from 3rtd party tread into tarantool cord
......@@ -624,3 +624,14 @@ fiber_channel_get_msg_timeout
fiber_channel_has_readers
fiber_channel_has_writers
fiber_channel_new
# picodata cbus
cbus_endpoint_delete
cbus_endpoint_new
cbus_loop
cbus_process
lcpipe_delete
lcpipe_flush_input
lcpipe_new
lcpipe_push
lcpipe_push_now
......@@ -87,6 +87,25 @@ static void
cpipe_flush_cb(ev_loop * /* loop */, struct ev_async *watcher,
int /* events */);
/**
* Acquire cbus endpoint, identified by consumer name.
* The call returns when the consumer has joined the bus.
*/
static inline void
acquire_consumer(struct cbus_endpoint **pipe_endpoint, const char *name)
{
tt_pthread_mutex_lock(&cbus.mutex);
struct cbus_endpoint *endpoint =
cbus_find_endpoint_locked(&cbus, name);
while (endpoint == NULL) {
tt_pthread_cond_wait(&cbus.cond, &cbus.mutex);
endpoint = cbus_find_endpoint_locked(&cbus, name);
}
*pipe_endpoint = endpoint;
++(*pipe_endpoint)->n_pipes;
tt_pthread_mutex_unlock(&cbus.mutex);
}
void
cpipe_create(struct cpipe *pipe, const char *consumer)
{
......@@ -100,16 +119,7 @@ cpipe_create(struct cpipe *pipe, const char *consumer)
pipe->flush_input.data = pipe;
rlist_create(&pipe->on_flush);
tt_pthread_mutex_lock(&cbus.mutex);
struct cbus_endpoint *endpoint =
cbus_find_endpoint_locked(&cbus, consumer);
while (endpoint == NULL) {
tt_pthread_cond_wait(&cbus.cond, &cbus.mutex);
endpoint = cbus_find_endpoint_locked(&cbus, consumer);
}
pipe->endpoint = endpoint;
++pipe->endpoint->n_pipes;
tt_pthread_mutex_unlock(&cbus.mutex);
acquire_consumer(&pipe->endpoint, consumer);
}
struct cmsg_poison {
......@@ -177,6 +187,145 @@ cpipe_destroy(struct cpipe *pipe)
TRASH(pipe);
}
struct lcpipe *
lcpipe_new(const char *consumer)
{
struct lcpipe *pipe = xmalloc(sizeof(*pipe));
stailq_create(&pipe->input);
pipe->n_input = 0;
pipe->max_input = INT_MAX;
acquire_consumer(&pipe->endpoint, consumer);
return pipe;
}
/**
* Flush all staged messages into consumer output and wake up it after.
*/
static void
lcpipe_flush(struct lcpipe *pipe)
{
struct cbus_endpoint *endpoint = pipe->endpoint;
if (pipe->n_input == 0)
return;
/* Trigger task processing when the queue becomes non-empty. */
bool output_was_empty;
/*
* We need to set a thread cancellation guard, because
* another thread may cancel the current thread
* (write() is a cancellation point in ev_async_send)
* and the activation of the ev_async watcher
* through ev_async_send will fail.
*/
int old_cancel_state;
tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
tt_pthread_mutex_lock(&endpoint->mutex);
output_was_empty = stailq_empty(&endpoint->output);
/** Flush input */
stailq_concat(&endpoint->output, &pipe->input);
tt_pthread_mutex_unlock(&endpoint->mutex);
pipe->n_input = 0;
if (output_was_empty) {
/* Count statistics */
rmean_collect(cbus.stats, CBUS_STAT_EVENTS, 1);
ev_async_send(pipe->endpoint->consumer, &pipe->endpoint->async);
}
tt_pthread_setcancelstate(old_cancel_state, NULL);
}
void
lcpipe_flush_input(struct lcpipe *pipe)
{
/** Flush may be called with no input. */
if (pipe->n_input > 0) {
lcpipe_flush(pipe);
}
}
/**
* Insert message into pipe input queue.
*/
static inline void
lcpipe_insert_input(struct lcpipe *pipe, struct cmsg *msg)
{
assert(msg->hop->pipe == NULL);
stailq_add_tail_entry(&pipe->input, msg, fifo);
pipe->n_input++;
if (pipe->n_input >= pipe->max_input) {
lcpipe_flush(pipe);
}
}
void
lcpipe_push(struct lcpipe *pipe, struct cmsg *msg)
{
lcpipe_insert_input(pipe, msg);
assert(pipe->n_input < pipe->max_input);
}
void
lcpipe_push_now(struct lcpipe *pipe, struct cmsg *msg)
{
lcpipe_insert_input(pipe, msg);
assert(pipe->n_input < pipe->max_input);
lcpipe_flush(pipe);
}
void
lcpipe_delete(struct lcpipe *pipe)
{
/*
* The thread should not be canceled while mutex is locked.
* And everything else must be protected for consistency.
*/
int old_cancel_state;
tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
static const struct cmsg_hop route[1] = {
{cbus_endpoint_poison_f, NULL}
};
struct cbus_endpoint *endpoint = pipe->endpoint;
struct cmsg_poison *poison = xmalloc(sizeof(*poison));
cmsg_init(&poison->msg, route);
poison->endpoint = pipe->endpoint;
/*
* Avoid the general purpose lcpipe_insert_input() since
* we want to control the way the poison message is
* delivered.
*/
tt_pthread_mutex_lock(&endpoint->mutex);
/* Flush input */
stailq_concat(&endpoint->output, &pipe->input);
pipe->n_input = 0;
/* Add the pipe shutdown message as the last one. */
stailq_add_tail_entry(&endpoint->output, poison, msg.fifo);
/* Count statistics */
rmean_collect(cbus.stats, CBUS_STAT_EVENTS, 1);
/*
* Keep the lock for the duration of ev_async_send():
* this will avoid a race condition between
* ev_async_send() and execution of the poison
* message, after which the endpoint may disappear.
*/
ev_async_send(endpoint->consumer, &endpoint->async);
tt_pthread_mutex_unlock(&endpoint->mutex);
tt_pthread_setcancelstate(old_cancel_state, NULL);
free(pipe);
}
static void
cbus_create(struct cbus *bus)
{
......@@ -247,8 +396,21 @@ cbus_endpoint_create(struct cbus_endpoint *endpoint, const char *name,
}
int
cbus_endpoint_destroy(struct cbus_endpoint *endpoint,
void (*process_cb)(struct cbus_endpoint *endpoint))
cbus_endpoint_new(struct cbus_endpoint **endpoint, const char *name)
{
struct cbus_endpoint *new_endpoint = xmalloc(sizeof(*new_endpoint));
*endpoint = new_endpoint;
int err = cbus_endpoint_create(new_endpoint, name, fiber_schedule_cb,
fiber());
if (err)
free(new_endpoint);
return err;
}
static inline void
cbus_endpoint_destroy_inner(struct cbus_endpoint *endpoint,
void (*process_cb)(struct cbus_endpoint *endpoint))
{
tt_pthread_mutex_lock(&cbus.mutex);
/*
......@@ -263,7 +425,7 @@ cbus_endpoint_destroy(struct cbus_endpoint *endpoint,
process_cb(endpoint);
if (endpoint->n_pipes == 0 && stailq_empty(&endpoint->output))
break;
fiber_cond_wait(&endpoint->cond);
fiber_cond_wait(&endpoint->cond);
}
/*
......@@ -275,6 +437,22 @@ cbus_endpoint_destroy(struct cbus_endpoint *endpoint,
tt_pthread_mutex_destroy(&endpoint->mutex);
ev_async_stop(endpoint->consumer, &endpoint->async);
fiber_cond_destroy(&endpoint->cond);
}
int
cbus_endpoint_delete(struct cbus_endpoint *endpoint)
{
cbus_endpoint_destroy_inner(endpoint, cbus_process);
free(endpoint);
TRASH(endpoint);
return 0;
}
int
cbus_endpoint_destroy(struct cbus_endpoint *endpoint,
void (*process_cb)(struct cbus_endpoint *endpoint))
{
cbus_endpoint_destroy_inner(endpoint, process_cb);
TRASH(endpoint);
return 0;
}
......
......@@ -299,6 +299,28 @@ int
cbus_endpoint_create(struct cbus_endpoint *endpoint, const char *name,
void (*fetch_cb)(ev_loop *, struct ev_watcher *, int), void *fetch_data);
/**
* Allocate and initialize cbus endpoint.
* Endpoint initialized by ready for use defaults. Can be using by 3rtd party
* code.
* @param endpoint cbus endpoint dst
* @param name a destination name
* @retval 0 for success
* @retval 1 if endpoint with given name already registered
*/
int
cbus_endpoint_new(struct cbus_endpoint **endpoint, const char *name);
/**
* Delete cbus endpoint. Disconnect the cord from cbus and free endpoint data.
* @param endpoint cbus endpoint must have been allocated with a call to
* `cbus_endpoint_new`.
* @retval 0 for success
* @retval 1 if there is connected pipe or unhandled message`
*/
int
cbus_endpoint_delete(struct cbus_endpoint *endpoint);
/**
* One round for message fetch and deliver */
void
......@@ -453,6 +475,66 @@ cbus_unpair(struct cpipe *dest_pipe, struct cpipe *src_pipe,
void (*unpair_cb)(void *), void *unpair_arg,
void (*process_cb)(struct cbus_endpoint *));
/**
* A uni-directional FIFO queue from any thread to cord.
* It is a light version of the cpipe, since some features are not needed for
* 3td party threads or they cannot be implemented.
*
*/
struct lcpipe {
/** Staging area for pushed messages */
struct stailq input;
/** Counters are useful for finer-grained scheduling. */
int n_input;
/**
* When pushing messages, keep the staged input size under
* this limit (speeds up message delivery and reduces
* latency, while still keeping the bus mutex cold enough).
*/
int max_input;
/**
* The cbus endpoint at the destination cord to handle
* flushed messages.
*/
struct cbus_endpoint *endpoint;
};
/**
* Create and initialize a pipe and connect it to the consumer.
* The call returns only when the consumer, identified by consumer name, has
* joined the bus.
**/
struct lcpipe *
lcpipe_new(const char *consumer);
/**
* Destroy a pipe with freeing up occupied memory.
* @param pipe lcpipe must have been allocated with a call to `lcpipe_new`.
**/
void
lcpipe_delete(struct lcpipe *pipe);
/**
* Flush all staged messages into the pipe and eventually to the
* consumer.
*/
void
lcpipe_flush_input(struct lcpipe *pipe);
/**
* Push a message into input queue without flushing.
* @param msg cbus message, pipe in message hop must be set at NULL.
*/
void
lcpipe_push(struct lcpipe *pipe, struct cmsg *msg);
/**
* Push a single message and flush input queue immediately.
* @param msg cbus message, pipe in message hop must be set at NULL.
*/
void
lcpipe_push_now(struct lcpipe *pipe, struct cmsg *msg);
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
......
......@@ -261,6 +261,9 @@ create_unit_test(PREFIX cbus_call
LIBRARIES core unit stat
)
add_executable(cbus_lcpipe.test cbus_lcpipe.c core_test_utils.c)
target_link_libraries(cbus_lcpipe.test core unit stat)
include(CheckSymbolExists)
check_symbol_exists(__GLIBC__ features.h GLIBC_USED)
if (GLIBC_USED)
......
#define UNIT_TAP_COMPATIBLE 1
#include "memory.h"
#include "fiber.h"
#include "cbus.h"
#include "unit.h"
#include "trigger.h"
/**
* Test lcpipe message passing. lcpipe works with messages in two modes:
* 1) lcpipe_push_now - send message immediately
* 2) lcpipe_push - puts a message in a lcpipe without forwarding it.
* Forwarding must call explicitly by calling lcpipe_flush_input function.
*/
/** Counter of flush events. */
static int flushed_cnt = 0;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
/** Common callbacks. {{{ ------------------------------------- */
static void
inc_counter_cb(struct cmsg *m)
{
(void)m;
++flushed_cnt;
note("flush event, counter: %d\n", flushed_cnt);
}
static void
inc_counter_and_signal_cb(struct cmsg *m)
{
inc_counter_cb(m);
tt_pthread_mutex_lock(&mutex);
tt_pthread_cond_signal(&cond);
tt_pthread_mutex_unlock(&mutex);
}
static void
inc_counter_and_signal_then_cancel_cb(struct cmsg *m)
{
inc_counter_and_signal_cb(m);
fiber_cancel(fiber());
}
/** }}} Common callbacks. ------------------------------------- */
/**
* Test push a single message.
* {{{ -----------------------------------------------------------
*/
static void
test_single_msg(struct lcpipe *pipe)
{
note("\n*** Test single message ***\n");
static struct cmsg_hop test_event_route[] = {
{ inc_counter_and_signal_cb, NULL },
};
static struct cmsg test_msg;
cmsg_init(&test_msg, test_event_route);
tt_pthread_mutex_lock(&mutex);
lcpipe_push_now(pipe, &test_msg);
tt_pthread_cond_wait(&cond, &mutex);
tt_pthread_mutex_unlock(&mutex);
is(flushed_cnt, 1, "1 flush after %s", __func__);
}
/** }}} Test single message. ---------------------------------- */
/**
* Test insert a batch of messages and flush it.
* {{{ -----------------------------------------------------------
*/
static void
test_batch_msg(struct lcpipe *pipe)
{
note("\n*** Test batch of messages ***\n");
static struct cmsg_hop test_event_routes[][1] = {
{{inc_counter_cb, NULL}},
{{inc_counter_cb, NULL}},
{{inc_counter_cb, NULL}},
{{inc_counter_cb, NULL}},
{{inc_counter_and_signal_cb, NULL}},
};
static struct cmsg test_msg[5];
for (unsigned i = 0; i < 5; i++) {
cmsg_init(&test_msg[i], test_event_routes[i]);
lcpipe_push(pipe, &test_msg[i]);
}
tt_pthread_mutex_lock(&mutex);
lcpipe_flush_input(pipe);
tt_pthread_cond_wait(&cond, &mutex);
tt_pthread_mutex_unlock(&mutex);
is(flushed_cnt, 6, "6 flush after %s", __func__);
}
/** }}} Test a batch of messages. ---------------------------------- */
/**
* Test sequence of lcpipe_push and lcpipe_push_now functions. lcpipe_push_now
* must release messages previously inserted by lcpipe_push function.
* {{{ -----------------------------------------------------------
*/
static void
test_push_then_push_now(struct lcpipe *pipe)
{
note("\n*** Test sequence of lcpipe_push and lcpipe_push_now ***\n");
static struct cmsg_hop test_event_route_1[] = {
{ inc_counter_cb, NULL },
};
static struct cmsg test_msg_1;
cmsg_init(&test_msg_1, test_event_route_1);
static struct cmsg_hop test_event_route_2[] = {
{ inc_counter_and_signal_then_cancel_cb, NULL },
};
static struct cmsg test_msg_2;
cmsg_init(&test_msg_2, test_event_route_2);
tt_pthread_mutex_lock(&mutex);
lcpipe_push(pipe, &test_msg_1);
lcpipe_push_now(pipe, &test_msg_2);
tt_pthread_cond_wait(&cond, &mutex);
tt_pthread_mutex_unlock(&mutex);
is(flushed_cnt, 8, "8 flush after %s", __func__);
}
/** }}} Test sequence of lcpipe_push and lcpipe_push_now functions. ------ */
/** Worker routines. {{{ -------------------------------------- */
static void *
worker_f(void *data)
{
char *name = (char *)data;
plan(3);
header();
note("start new worker, thread %s\n", name);
struct lcpipe *pipe = lcpipe_new("main");
test_single_msg(pipe);
test_batch_msg(pipe);
test_push_then_push_now(pipe);
lcpipe_delete(pipe);
int rc = check_plan();
pthread_exit((void *)(intptr_t)rc);
}
static void
worker_start(pthread_t *handle)
{
pthread_create(handle, NULL, worker_f, "X");
}
static int
worker_stop(pthread_t handle)
{
note("finish worker\n");
void *rc = NULL;
pthread_join(handle, &rc);
return (int)(intptr_t)rc;
}
/** }}} Worker routines. -------------------------------------- */
static int
main_f(va_list ap)
{
(void)ap;
struct cbus_endpoint endpoint;
cbus_endpoint_create(&endpoint, "main", fiber_schedule_cb, fiber());
pthread_t th;
worker_start(&th);
cbus_loop(&endpoint);
int rc = worker_stop(th);
cbus_endpoint_destroy(&endpoint, cbus_process);
ev_break(loop(), EVBREAK_ALL);
return rc;
}
int
main(void)
{
memory_init();
fiber_init(fiber_c_invoke);
cbus_init();
struct fiber *main_fiber = fiber_new("main", main_f);
assert(main_fiber != NULL);
fiber_set_joinable(main_fiber, true);
fiber_wakeup(main_fiber);
ev_run(loop(), 0);
note("finish %s loop\n", __func__);
int rc = fiber_join(main_fiber);
cbus_free();
fiber_free();
memory_free();
footer();
return rc;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment