From d653d0886c607be53048e81176a6c1f6ed6e6034 Mon Sep 17 00:00:00 2001 From: godzie44 <godzie@yandex.ru> Date: Mon, 22 May 2023 13:59:28 +0300 Subject: [PATCH] cbus: introduce lcpipe - light cpipe Introduced a new type of cbus pipe - lcpipe. The current pipe in the cbus - cpipe, has a number of limitations, first of all - the cpipe cannot be used from the 3rd party threads, cpipe only works as a channel between two cords. That why lcpipe is needed. Its main responsibility - create channel between any thread and tarantool cord. Internally lcpipe is a cpipe, but: - on flush triggers removed, cause triggers use thread-local mem-pool, this is not possible on a third party thread - producer event loop removed, cause there is no libev event loop in third party thread Also, lcpipe interface is exported to the outside world. NO_DOC=core feature --- changelogs/unreleased/fork-3-lcpipe.md | 8 + extra/exports | 11 ++ src/lib/core/cbus.c | 204 +++++++++++++++++++++-- src/lib/core/cbus.h | 82 ++++++++++ test/unit/CMakeLists.txt | 3 + test/unit/cbus_lcpipe.c | 218 +++++++++++++++++++++++++ 6 files changed, 513 insertions(+), 13 deletions(-) create mode 100644 changelogs/unreleased/fork-3-lcpipe.md create mode 100644 test/unit/cbus_lcpipe.c diff --git a/changelogs/unreleased/fork-3-lcpipe.md b/changelogs/unreleased/fork-3-lcpipe.md new file mode 100644 index 0000000000..e8ebc76ef7 --- /dev/null +++ b/changelogs/unreleased/fork-3-lcpipe.md @@ -0,0 +1,8 @@ +## feature/build + +* export cbus interface: + * cbus_endpoint_new + * cbus_endpoint_delete + * cbus_loop + * cbus_process +* add new type of cbus pipe: lcpipe. Lcpipe main purpose - send messages from 3rtd party tread into tarantool cord diff --git a/extra/exports b/extra/exports index d59221142f..b4330ea90d 100644 --- a/extra/exports +++ b/extra/exports @@ -624,3 +624,14 @@ fiber_channel_get_msg_timeout fiber_channel_has_readers fiber_channel_has_writers fiber_channel_new + +# picodata cbus +cbus_endpoint_delete +cbus_endpoint_new +cbus_loop +cbus_process +lcpipe_delete +lcpipe_flush_input +lcpipe_new +lcpipe_push +lcpipe_push_now diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c index 4724dd6468..81fc0f615d 100644 --- a/src/lib/core/cbus.c +++ b/src/lib/core/cbus.c @@ -87,6 +87,25 @@ static void cpipe_flush_cb(ev_loop * /* loop */, struct ev_async *watcher, int /* events */); +/** + * Acquire cbus endpoint, identified by consumer name. + * The call returns when the consumer has joined the bus. + */ +static inline void +acquire_consumer(struct cbus_endpoint **pipe_endpoint, const char *name) +{ + tt_pthread_mutex_lock(&cbus.mutex); + struct cbus_endpoint *endpoint = + cbus_find_endpoint_locked(&cbus, name); + while (endpoint == NULL) { + tt_pthread_cond_wait(&cbus.cond, &cbus.mutex); + endpoint = cbus_find_endpoint_locked(&cbus, name); + } + *pipe_endpoint = endpoint; + ++(*pipe_endpoint)->n_pipes; + tt_pthread_mutex_unlock(&cbus.mutex); +} + void cpipe_create(struct cpipe *pipe, const char *consumer) { @@ -100,16 +119,7 @@ cpipe_create(struct cpipe *pipe, const char *consumer) pipe->flush_input.data = pipe; rlist_create(&pipe->on_flush); - tt_pthread_mutex_lock(&cbus.mutex); - struct cbus_endpoint *endpoint = - cbus_find_endpoint_locked(&cbus, consumer); - while (endpoint == NULL) { - tt_pthread_cond_wait(&cbus.cond, &cbus.mutex); - endpoint = cbus_find_endpoint_locked(&cbus, consumer); - } - pipe->endpoint = endpoint; - ++pipe->endpoint->n_pipes; - tt_pthread_mutex_unlock(&cbus.mutex); + acquire_consumer(&pipe->endpoint, consumer); } struct cmsg_poison { @@ -177,6 +187,145 @@ cpipe_destroy(struct cpipe *pipe) TRASH(pipe); } +struct lcpipe * +lcpipe_new(const char *consumer) +{ + struct lcpipe *pipe = xmalloc(sizeof(*pipe)); + + stailq_create(&pipe->input); + + pipe->n_input = 0; + pipe->max_input = INT_MAX; + + acquire_consumer(&pipe->endpoint, consumer); + + return pipe; +} + +/** + * Flush all staged messages into consumer output and wake up it after. + */ +static void +lcpipe_flush(struct lcpipe *pipe) +{ + struct cbus_endpoint *endpoint = pipe->endpoint; + if (pipe->n_input == 0) + return; + + /* Trigger task processing when the queue becomes non-empty. */ + bool output_was_empty; + + /* + * We need to set a thread cancellation guard, because + * another thread may cancel the current thread + * (write() is a cancellation point in ev_async_send) + * and the activation of the ev_async watcher + * through ev_async_send will fail. + */ + + int old_cancel_state; + tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state); + + tt_pthread_mutex_lock(&endpoint->mutex); + output_was_empty = stailq_empty(&endpoint->output); + /** Flush input */ + stailq_concat(&endpoint->output, &pipe->input); + tt_pthread_mutex_unlock(&endpoint->mutex); + + pipe->n_input = 0; + if (output_was_empty) { + /* Count statistics */ + rmean_collect(cbus.stats, CBUS_STAT_EVENTS, 1); + + ev_async_send(pipe->endpoint->consumer, &pipe->endpoint->async); + } + + tt_pthread_setcancelstate(old_cancel_state, NULL); +} + +void +lcpipe_flush_input(struct lcpipe *pipe) +{ + /** Flush may be called with no input. */ + if (pipe->n_input > 0) { + lcpipe_flush(pipe); + } +} + +/** + * Insert message into pipe input queue. + */ +static inline void +lcpipe_insert_input(struct lcpipe *pipe, struct cmsg *msg) +{ + assert(msg->hop->pipe == NULL); + stailq_add_tail_entry(&pipe->input, msg, fifo); + pipe->n_input++; + if (pipe->n_input >= pipe->max_input) { + lcpipe_flush(pipe); + } +} + +void +lcpipe_push(struct lcpipe *pipe, struct cmsg *msg) +{ + lcpipe_insert_input(pipe, msg); + assert(pipe->n_input < pipe->max_input); +} + +void +lcpipe_push_now(struct lcpipe *pipe, struct cmsg *msg) +{ + lcpipe_insert_input(pipe, msg); + assert(pipe->n_input < pipe->max_input); + lcpipe_flush(pipe); +} + +void +lcpipe_delete(struct lcpipe *pipe) +{ + /* + * The thread should not be canceled while mutex is locked. + * And everything else must be protected for consistency. + */ + int old_cancel_state; + tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state); + + static const struct cmsg_hop route[1] = { + {cbus_endpoint_poison_f, NULL} + }; + + struct cbus_endpoint *endpoint = pipe->endpoint; + struct cmsg_poison *poison = xmalloc(sizeof(*poison)); + cmsg_init(&poison->msg, route); + poison->endpoint = pipe->endpoint; + /* + * Avoid the general purpose lcpipe_insert_input() since + * we want to control the way the poison message is + * delivered. + */ + tt_pthread_mutex_lock(&endpoint->mutex); + /* Flush input */ + stailq_concat(&endpoint->output, &pipe->input); + pipe->n_input = 0; + /* Add the pipe shutdown message as the last one. */ + stailq_add_tail_entry(&endpoint->output, poison, msg.fifo); + /* Count statistics */ + rmean_collect(cbus.stats, CBUS_STAT_EVENTS, 1); + /* + * Keep the lock for the duration of ev_async_send(): + * this will avoid a race condition between + * ev_async_send() and execution of the poison + * message, after which the endpoint may disappear. + */ + ev_async_send(endpoint->consumer, &endpoint->async); + tt_pthread_mutex_unlock(&endpoint->mutex); + + tt_pthread_setcancelstate(old_cancel_state, NULL); + + free(pipe); +} + static void cbus_create(struct cbus *bus) { @@ -247,8 +396,21 @@ cbus_endpoint_create(struct cbus_endpoint *endpoint, const char *name, } int -cbus_endpoint_destroy(struct cbus_endpoint *endpoint, - void (*process_cb)(struct cbus_endpoint *endpoint)) +cbus_endpoint_new(struct cbus_endpoint **endpoint, const char *name) +{ + struct cbus_endpoint *new_endpoint = xmalloc(sizeof(*new_endpoint)); + *endpoint = new_endpoint; + + int err = cbus_endpoint_create(new_endpoint, name, fiber_schedule_cb, + fiber()); + if (err) + free(new_endpoint); + return err; +} + +static inline void +cbus_endpoint_destroy_inner(struct cbus_endpoint *endpoint, + void (*process_cb)(struct cbus_endpoint *endpoint)) { tt_pthread_mutex_lock(&cbus.mutex); /* @@ -263,7 +425,7 @@ cbus_endpoint_destroy(struct cbus_endpoint *endpoint, process_cb(endpoint); if (endpoint->n_pipes == 0 && stailq_empty(&endpoint->output)) break; - fiber_cond_wait(&endpoint->cond); + fiber_cond_wait(&endpoint->cond); } /* @@ -275,6 +437,22 @@ cbus_endpoint_destroy(struct cbus_endpoint *endpoint, tt_pthread_mutex_destroy(&endpoint->mutex); ev_async_stop(endpoint->consumer, &endpoint->async); fiber_cond_destroy(&endpoint->cond); +} + +int +cbus_endpoint_delete(struct cbus_endpoint *endpoint) +{ + cbus_endpoint_destroy_inner(endpoint, cbus_process); + free(endpoint); + TRASH(endpoint); + return 0; +} + +int +cbus_endpoint_destroy(struct cbus_endpoint *endpoint, + void (*process_cb)(struct cbus_endpoint *endpoint)) +{ + cbus_endpoint_destroy_inner(endpoint, process_cb); TRASH(endpoint); return 0; } diff --git a/src/lib/core/cbus.h b/src/lib/core/cbus.h index 6b06312841..c7dd96bd88 100644 --- a/src/lib/core/cbus.h +++ b/src/lib/core/cbus.h @@ -299,6 +299,28 @@ int cbus_endpoint_create(struct cbus_endpoint *endpoint, const char *name, void (*fetch_cb)(ev_loop *, struct ev_watcher *, int), void *fetch_data); +/** + * Allocate and initialize cbus endpoint. + * Endpoint initialized by ready for use defaults. Can be using by 3rtd party + * code. + * @param endpoint cbus endpoint dst + * @param name a destination name + * @retval 0 for success + * @retval 1 if endpoint with given name already registered + */ +int +cbus_endpoint_new(struct cbus_endpoint **endpoint, const char *name); + +/** + * Delete cbus endpoint. Disconnect the cord from cbus and free endpoint data. + * @param endpoint cbus endpoint must have been allocated with a call to + * `cbus_endpoint_new`. + * @retval 0 for success + * @retval 1 if there is connected pipe or unhandled message` + */ +int +cbus_endpoint_delete(struct cbus_endpoint *endpoint); + /** * One round for message fetch and deliver */ void @@ -453,6 +475,66 @@ cbus_unpair(struct cpipe *dest_pipe, struct cpipe *src_pipe, void (*unpair_cb)(void *), void *unpair_arg, void (*process_cb)(struct cbus_endpoint *)); +/** + * A uni-directional FIFO queue from any thread to cord. + * It is a light version of the cpipe, since some features are not needed for + * 3td party threads or they cannot be implemented. + * + */ +struct lcpipe { + /** Staging area for pushed messages */ + struct stailq input; + /** Counters are useful for finer-grained scheduling. */ + int n_input; + /** + * When pushing messages, keep the staged input size under + * this limit (speeds up message delivery and reduces + * latency, while still keeping the bus mutex cold enough). + */ + int max_input; + /** + * The cbus endpoint at the destination cord to handle + * flushed messages. + */ + struct cbus_endpoint *endpoint; +}; + +/** + * Create and initialize a pipe and connect it to the consumer. + * The call returns only when the consumer, identified by consumer name, has + * joined the bus. + **/ +struct lcpipe * +lcpipe_new(const char *consumer); + +/** + * Destroy a pipe with freeing up occupied memory. + * @param pipe lcpipe must have been allocated with a call to `lcpipe_new`. + **/ +void +lcpipe_delete(struct lcpipe *pipe); + +/** + * Flush all staged messages into the pipe and eventually to the + * consumer. + */ +void +lcpipe_flush_input(struct lcpipe *pipe); + +/** + * Push a message into input queue without flushing. + * @param msg cbus message, pipe in message hop must be set at NULL. + */ +void +lcpipe_push(struct lcpipe *pipe, struct cmsg *msg); + +/** + * Push a single message and flush input queue immediately. + * @param msg cbus message, pipe in message hop must be set at NULL. + */ +void +lcpipe_push_now(struct lcpipe *pipe, struct cmsg *msg); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index e14d37b409..2bf5f47693 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -261,6 +261,9 @@ create_unit_test(PREFIX cbus_call LIBRARIES core unit stat ) +add_executable(cbus_lcpipe.test cbus_lcpipe.c core_test_utils.c) +target_link_libraries(cbus_lcpipe.test core unit stat) + include(CheckSymbolExists) check_symbol_exists(__GLIBC__ features.h GLIBC_USED) if (GLIBC_USED) diff --git a/test/unit/cbus_lcpipe.c b/test/unit/cbus_lcpipe.c new file mode 100644 index 0000000000..6682a191d2 --- /dev/null +++ b/test/unit/cbus_lcpipe.c @@ -0,0 +1,218 @@ +#define UNIT_TAP_COMPATIBLE 1 +#include "memory.h" +#include "fiber.h" +#include "cbus.h" +#include "unit.h" +#include "trigger.h" + +/** + * Test lcpipe message passing. lcpipe works with messages in two modes: + * 1) lcpipe_push_now - send message immediately + * 2) lcpipe_push - puts a message in a lcpipe without forwarding it. + * Forwarding must call explicitly by calling lcpipe_flush_input function. + */ + +/** Counter of flush events. */ +static int flushed_cnt = 0; + +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +/** Common callbacks. {{{ ------------------------------------- */ + +static void +inc_counter_cb(struct cmsg *m) +{ + (void)m; + ++flushed_cnt; + note("flush event, counter: %d\n", flushed_cnt); +} + +static void +inc_counter_and_signal_cb(struct cmsg *m) +{ + inc_counter_cb(m); + tt_pthread_mutex_lock(&mutex); + tt_pthread_cond_signal(&cond); + tt_pthread_mutex_unlock(&mutex); +} + +static void +inc_counter_and_signal_then_cancel_cb(struct cmsg *m) +{ + inc_counter_and_signal_cb(m); + fiber_cancel(fiber()); +} + +/** }}} Common callbacks. ------------------------------------- */ + +/** + * Test push a single message. + * {{{ ----------------------------------------------------------- + */ + +static void +test_single_msg(struct lcpipe *pipe) +{ + note("\n*** Test single message ***\n"); + static struct cmsg_hop test_event_route[] = { + { inc_counter_and_signal_cb, NULL }, + }; + static struct cmsg test_msg; + cmsg_init(&test_msg, test_event_route); + + tt_pthread_mutex_lock(&mutex); + + lcpipe_push_now(pipe, &test_msg); + + tt_pthread_cond_wait(&cond, &mutex); + tt_pthread_mutex_unlock(&mutex); + + is(flushed_cnt, 1, "1 flush after %s", __func__); +} + +/** }}} Test single message. ---------------------------------- */ + +/** + * Test insert a batch of messages and flush it. + * {{{ ----------------------------------------------------------- + */ + +static void +test_batch_msg(struct lcpipe *pipe) +{ + note("\n*** Test batch of messages ***\n"); + static struct cmsg_hop test_event_routes[][1] = { + {{inc_counter_cb, NULL}}, + {{inc_counter_cb, NULL}}, + {{inc_counter_cb, NULL}}, + {{inc_counter_cb, NULL}}, + {{inc_counter_and_signal_cb, NULL}}, + }; + static struct cmsg test_msg[5]; + for (unsigned i = 0; i < 5; i++) { + cmsg_init(&test_msg[i], test_event_routes[i]); + lcpipe_push(pipe, &test_msg[i]); + } + + tt_pthread_mutex_lock(&mutex); + + lcpipe_flush_input(pipe); + + tt_pthread_cond_wait(&cond, &mutex); + tt_pthread_mutex_unlock(&mutex); + + is(flushed_cnt, 6, "6 flush after %s", __func__); +} + +/** }}} Test a batch of messages. ---------------------------------- */ + +/** + * Test sequence of lcpipe_push and lcpipe_push_now functions. lcpipe_push_now + * must release messages previously inserted by lcpipe_push function. + * {{{ ----------------------------------------------------------- + */ + +static void +test_push_then_push_now(struct lcpipe *pipe) +{ + note("\n*** Test sequence of lcpipe_push and lcpipe_push_now ***\n"); + static struct cmsg_hop test_event_route_1[] = { + { inc_counter_cb, NULL }, + }; + static struct cmsg test_msg_1; + cmsg_init(&test_msg_1, test_event_route_1); + + static struct cmsg_hop test_event_route_2[] = { + { inc_counter_and_signal_then_cancel_cb, NULL }, + }; + static struct cmsg test_msg_2; + cmsg_init(&test_msg_2, test_event_route_2); + + tt_pthread_mutex_lock(&mutex); + + lcpipe_push(pipe, &test_msg_1); + lcpipe_push_now(pipe, &test_msg_2); + + tt_pthread_cond_wait(&cond, &mutex); + tt_pthread_mutex_unlock(&mutex); + + is(flushed_cnt, 8, "8 flush after %s", __func__); +} + +/** }}} Test sequence of lcpipe_push and lcpipe_push_now functions. ------ */ + +/** Worker routines. {{{ -------------------------------------- */ + +static void * +worker_f(void *data) +{ + char *name = (char *)data; + + plan(3); + header(); + + note("start new worker, thread %s\n", name); + + struct lcpipe *pipe = lcpipe_new("main"); + test_single_msg(pipe); + test_batch_msg(pipe); + test_push_then_push_now(pipe); + lcpipe_delete(pipe); + + int rc = check_plan(); + pthread_exit((void *)(intptr_t)rc); +} + +static void +worker_start(pthread_t *handle) +{ + pthread_create(handle, NULL, worker_f, "X"); +} + +static int +worker_stop(pthread_t handle) +{ + note("finish worker\n"); + void *rc = NULL; + pthread_join(handle, &rc); + return (int)(intptr_t)rc; +} + +/** }}} Worker routines. -------------------------------------- */ + +static int +main_f(va_list ap) +{ + (void)ap; + struct cbus_endpoint endpoint; + cbus_endpoint_create(&endpoint, "main", fiber_schedule_cb, fiber()); + pthread_t th; + worker_start(&th); + cbus_loop(&endpoint); + int rc = worker_stop(th); + cbus_endpoint_destroy(&endpoint, cbus_process); + ev_break(loop(), EVBREAK_ALL); + return rc; +} + +int +main(void) +{ + memory_init(); + fiber_init(fiber_c_invoke); + cbus_init(); + struct fiber *main_fiber = fiber_new("main", main_f); + assert(main_fiber != NULL); + fiber_set_joinable(main_fiber, true); + fiber_wakeup(main_fiber); + ev_run(loop(), 0); + note("finish %s loop\n", __func__); + int rc = fiber_join(main_fiber); + cbus_free(); + fiber_free(); + memory_free(); + + footer(); + return rc; +} -- GitLab