From 8ce1e24b30677d5b5b3e44d4f912ffe6ebc56c27 Mon Sep 17 00:00:00 2001 From: Aleksey Demakov <ademakov@gmail.com> Date: Mon, 16 Apr 2012 16:08:54 +0400 Subject: [PATCH] Remove fiber inbox code as it is no longer used. --- core/fiber.m | 218 +---------------------------------- core/log_io_remote.m | 2 +- core/replication.m | 2 +- core/tarantool_lua.m | 2 +- include/fiber.h | 19 +-- mod/box/memcached-grammar.m | 6 +- mod/box/memcached-grammar.rl | 2 +- mod/box/memcached.m | 2 +- 8 files changed, 12 insertions(+), 241 deletions(-) diff --git a/core/fiber.m b/core/fiber.m index 1abbb7063b..a0e840fdb1 100644 --- a/core/fiber.m +++ b/core/fiber.m @@ -365,14 +365,6 @@ unregister_fid(struct fiber *fiber) mh_i32ptr_del(fibers_registry, k); } -static void -clear_inbox(struct fiber *fiber) -{ - for (size_t i = 0; i < fiber->inbox->size; i++) - fiber->inbox->ring[i] = NULL; - fiber->inbox->head = fiber->inbox->tail = 0; -} - static void fiber_alloc(struct fiber *fiber) { @@ -382,7 +374,6 @@ fiber_alloc(struct fiber *fiber) fiber->cleanup = tbuf_alloc(fiber->gc_pool); fiber->iov_cnt = 0; - clear_inbox(fiber); } void @@ -435,15 +426,6 @@ fiber_gc(void) } fiber->iov = new_iov; - for (int i = 0; i < fiber->inbox->size; i++) { - struct msg *ri = fiber->inbox->ring[i]; - if (ri != NULL) { - fiber->inbox->ring[i] = palloc(fiber->gc_pool, sizeof(*ri)); - fiber->inbox->ring[i]->sender_fid = ri->sender_fid; - fiber->inbox->ring[i]->msg = tbuf_clone(fiber->gc_pool, ri->msg); - } - } - prelease(ex_pool); } @@ -504,11 +486,9 @@ fiber_set_name(struct fiber *fiber, const char *name) /* fiber never dies, just become zombie */ struct fiber * -fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void *f_data) +fiber_create(const char *name, int fd, void (*f) (void *), void *f_data) { struct fiber *fiber = NULL; - if (inbox_size <= 0) - inbox_size = 64; if (!SLIST_EMPTY(&zombie_fibers)) { fiber = SLIST_FIRST(&zombie_fibers); @@ -523,9 +503,6 @@ fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void return NULL; fiber->gc_pool = palloc_create_pool(""); - fiber->inbox = palloc(eter_pool, (sizeof(*fiber->inbox) + - inbox_size * sizeof(struct tbuf *))); - fiber->inbox->size = inbox_size; fiber_alloc(fiber); ev_init(&fiber->io, (void *)ev_schedule); @@ -626,73 +603,6 @@ fiber_close(void) return r; } -static int -ring_size(struct ring *inbox) -{ - return (inbox->size + inbox->head - inbox->tail) % inbox->size; -} - -int -inbox_size(struct fiber *recipient) -{ - return ring_size(recipient->inbox); -} - -/** - * @note: this is a cancellation point (@sa fiber_testcancel()) - */ - -void -wait_inbox(struct fiber *recipient) -{ - while (ring_size(recipient->inbox) == 0) { - recipient->flags |= FIBER_READING_INBOX; - fiber_yield(); - recipient->flags &= ~FIBER_READING_INBOX; - fiber_testcancel(); - } -} - -bool -write_inbox(struct fiber *recipient, struct tbuf *msg) -{ - struct ring *inbox = recipient->inbox; - if (ring_size(inbox) == inbox->size - 1) - return false; - - inbox->ring[inbox->head] = palloc(recipient->gc_pool, sizeof(struct msg)); - inbox->ring[inbox->head]->sender_fid = fiber->fid; - inbox->ring[inbox->head]->msg = tbuf_clone(recipient->gc_pool, msg); - inbox->head = (inbox->head + 1) % inbox->size; - - if (recipient->flags & FIBER_READING_INBOX) - fiber_call(recipient); - return true; -} - - -/** - * @note: this is a cancellation point (@sa fiber_testcancel()) - */ - -struct msg * -read_inbox(void) -{ - struct ring *restrict inbox = fiber->inbox; - while (ring_size(inbox) == 0) { - fiber->flags |= FIBER_READING_INBOX; - fiber_yield(); - fiber->flags &= ~FIBER_READING_INBOX; - fiber_testcancel(); - } - - struct msg *msg = inbox->ring[inbox->tail]; - inbox->ring[inbox->tail] = NULL; - inbox->tail = (inbox->tail + 1) % inbox->size; - - return msg; -} - /** * Read at least at_least bytes from a socket. * @@ -998,127 +908,6 @@ blocking_loop(int fd, struct tbuf *(*handler) (void *state, struct tbuf *), void exit(result); } -static void -inbox2sock(void *_data __attribute__((unused))) -{ - struct tbuf *msg, *out; - struct msg *m; - u32 len; - - for (;;) { - out = tbuf_alloc(fiber->gc_pool); - - do { - m = read_inbox(); - msg = tbuf_alloc(fiber->gc_pool); - - /* TODO: do not copy message twice */ - tbuf_reserve(msg, sizeof(struct fiber_msg) + m->msg->size); - fiber_msg(msg)->fid = m->sender_fid; - fiber_msg(msg)->data_len = m->msg->size; - memcpy(fiber_msg(msg)->data, m->msg->data, m->msg->size); - len = htonl(msg->size); - - tbuf_append(out, &len, sizeof(len)); - tbuf_append(out, msg->data, msg->size); - } while (ring_size(fiber->inbox) > 0); - - if (fiber_write(out->data, out->size) != out->size) - panic("child is dead"); - fiber_gc(); - } -} - -static void -sock2inbox(void *_data __attribute__((unused))) -{ - struct tbuf *msg, *msg_body; - struct fiber *recipient; - u32 len; - - for (;;) { - if (fiber->rbuf->size < sizeof(len)) { - if (fiber_bread(fiber->rbuf, sizeof(len)) <= 0) - panic("child is dead"); - } - - len = read_u32(fiber->rbuf); - - len = ntohl(len); - if (fiber->rbuf->size < len) { - if (fiber_bread(fiber->rbuf, len) <= 0) - panic("child is dead"); - } - - msg = tbuf_split(fiber->rbuf, len); - recipient = fiber_find(fiber_msg(msg)->fid); - if (recipient == NULL) { - say_error("recipient is lost"); - continue; - } - - msg_body = tbuf_alloc(recipient->gc_pool); - tbuf_append(msg_body, fiber_msg(msg)->data, fiber_msg(msg)->data_len); - write_inbox(recipient, msg_body); - fiber_gc(); - } -} - -struct child * -spawn_child(const char *name, int inbox_size, struct tbuf *(*handler) (void *, struct tbuf *), - void *state) -{ - char proxy_name[FIBER_NAME_MAXLEN]; - int socks[2]; - int pid; - - if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == -1) { - say_syserror("socketpair"); - return NULL; - } - - if ((pid = fork()) == -1) { - say_syserror("fork"); - return NULL; - } - - if (pid) { - close(socks[0]); - if (set_nonblock(socks[1]) == -1) - return NULL; - - struct child *c = palloc(eter_pool, sizeof(*c)); - c->pid = pid; - - snprintf(proxy_name, sizeof(proxy_name), "%s/sock2inbox", name); - c->in = fiber_create(proxy_name, socks[1], inbox_size, sock2inbox, NULL); - fiber_call(c->in); - snprintf(proxy_name, sizeof(proxy_name), "%s/inbox2sock", name); - c->out = fiber_create(proxy_name, socks[1], inbox_size, inbox2sock, NULL); - c->out->flags |= FIBER_READING_INBOX; - return c; - } else { - /* it is safer to tell libev about fork, even - * if child wont' use it. */ - ev_default_fork(); - ev_loop(EVLOOP_NONBLOCK); - - char child_name[FIBER_NAME_MAXLEN]; - /* - * Move to an own process group, to not receive - * signals from the controlling tty. - */ - setpgid(0, 0); - /* destroying salloc in tarantool_free() */ - close_all_xcpt(2, socks[0], sayfd); - snprintf(child_name, sizeof(child_name), "%s/child", name); - fiber_set_name(&sched, child_name); - set_proc_title("%s%s", name, custom_proc_title); - say_crit("%s initialized", name); - blocking_loop(socks[0], handler, state); - } -} - static void tcp_server_handler(void *data) { @@ -1154,7 +943,7 @@ tcp_server_handler(void *data) } snprintf(name, sizeof(name), "%i/handler", server->port); - h = fiber_create(name, fd, -1, server->handler, server->data); + h = fiber_create(name, fd, server->handler, server->data); if (h == NULL) { say_error("can't create handler fiber, dropping client connection"); close(fd); @@ -1187,7 +976,7 @@ fiber_server(const char *name, int port, void (*handler) (void *data), void *dat server->port = port; server->handler = handler; server->on_bind = on_bind; - s = fiber_create(server_name, -1, -1, tcp_server_handler, server); + s = fiber_create(server_name, -1, tcp_server_handler, server); fiber_call(s); /* give a handler a chance */ return s; @@ -1312,7 +1101,6 @@ fiber_info(struct tbuf *out) tbuf_printf(out, " - fid: %4i" CRLF, fiber->fid); tbuf_printf(out, " csw: %i" CRLF, fiber->csw); tbuf_printf(out, " name: %s" CRLF, fiber->name); - tbuf_printf(out, " inbox: %i" CRLF, ring_size(fiber->inbox)); tbuf_printf(out, " fd: %4i" CRLF, fiber->fd); tbuf_printf(out, " peer: %s" CRLF, fiber_peer_name(fiber)); tbuf_printf(out, " stack: %p" CRLF, stack_top); diff --git a/core/log_io_remote.m b/core/log_io_remote.m index eefa828637..cb96d3ddb5 100644 --- a/core/log_io_remote.m +++ b/core/log_io_remote.m @@ -186,7 +186,7 @@ recovery_follow_remote(struct recovery_state *r, const char *remote) say_crit("initializing the replica, WAL master %s", remote); snprintf(name, sizeof(name), "replica/%s", remote); - f = fiber_create(name, -1, -1, pull_from_remote, r); + f = fiber_create(name, -1, pull_from_remote, r); if (f == NULL) return; diff --git a/core/replication.m b/core/replication.m index 0c58898e9a..326762ccae 100644 --- a/core/replication.m +++ b/core/replication.m @@ -214,7 +214,7 @@ replication_init() /* create acceptor fiber */ snprintf(fiber_name, FIBER_NAME_MAXLEN, "%i/replication", cfg.replication_port); - struct fiber *acceptor = fiber_create(fiber_name, -1, -1, acceptor_handler, NULL); + struct fiber *acceptor = fiber_create(fiber_name, -1, acceptor_handler, NULL); if (acceptor == NULL) { panic("create fiber fail"); diff --git a/core/tarantool_lua.m b/core/tarantool_lua.m index 66d4615ec9..3c3ae76b8a 100644 --- a/core/tarantool_lua.m +++ b/core/tarantool_lua.m @@ -544,7 +544,7 @@ lbox_fiber_create(struct lua_State *L) luaL_error(L, "fiber.create(function): recursion limit" " reached"); } - struct fiber *f= fiber_create("lua", -1, -1, box_lua_fiber_run, NULL); + struct fiber *f= fiber_create("lua", -1, box_lua_fiber_run, NULL); lua_pushlightuserdata(L, f); /* associate coro with fiber */ struct lua_State *child_L = lua_newthread(L); diff --git a/include/fiber.h b/include/fiber.h index 66f4148a02..6ccb2deb48 100644 --- a/include/fiber.h +++ b/include/fiber.h @@ -58,16 +58,6 @@ @interface FiberCancelException: tnt_Exception @end -struct msg { - uint32_t sender_fid; - struct tbuf *msg; -}; - -struct ring { - size_t size, head, tail; - struct msg *ring[]; -}; - struct fiber { ev_io io; ev_async async; @@ -91,8 +81,6 @@ struct fiber { SLIST_ENTRY(fiber) link, zombie_link; - struct ring *inbox; - /* ASCIIZ name of this fiber. */ char name[FIBER_NAME_MAXLEN]; void (*f) (void *); @@ -131,7 +119,7 @@ extern struct fiber *fiber; void fiber_init(void); void fiber_free(void); -struct fiber *fiber_create(const char *name, int fd, int inbox_size, void (*f) (void *), void *); +struct fiber *fiber_create(const char *name, int fd, void (*f) (void *), void *); void fiber_set_name(struct fiber *fiber, const char *name); void wait_for_child(pid_t pid); @@ -151,7 +139,6 @@ void fiber_destroy_all(); bool fiber_is_caller(struct fiber *f); -struct msg *read_inbox(void); ssize_t fiber_bread(struct tbuf *, size_t v); inline static void iov_add_unsafe(const void *buf, size_t len) @@ -189,10 +176,6 @@ ssize_t iov_flush(void); /* Write everything in the fiber's iov vector to fiber socket. */ void iov_reset(); -bool write_inbox(struct fiber *recipient, struct tbuf *msg); -int inbox_size(struct fiber *recipient); -void wait_inbox(struct fiber *recipient); - const char *fiber_peer_name(struct fiber *fiber); ssize_t fiber_read(void *buf, size_t count); ssize_t fiber_write(const void *buf, size_t count); diff --git a/mod/box/memcached-grammar.m b/mod/box/memcached-grammar.m index 688930b834..2b07325cb8 100644 --- a/mod/box/memcached-grammar.m +++ b/mod/box/memcached-grammar.m @@ -969,7 +969,7 @@ tr169: #line 208 "mod/box/memcached-grammar.rl" { if (flush_delay > 0) { - struct fiber *f = fiber_create("flush_all", -1, -1, flush_all, (void *)flush_delay); + struct fiber *f = fiber_create("flush_all", -1, flush_all, (void *)flush_delay); if (f) fiber_call(f); } else @@ -991,7 +991,7 @@ tr174: #line 208 "mod/box/memcached-grammar.rl" { if (flush_delay > 0) { - struct fiber *f = fiber_create("flush_all", -1, -1, flush_all, (void *)flush_delay); + struct fiber *f = fiber_create("flush_all", -1, flush_all, (void *)flush_delay); if (f) fiber_call(f); } else @@ -1013,7 +1013,7 @@ tr185: #line 208 "mod/box/memcached-grammar.rl" { if (flush_delay > 0) { - struct fiber *f = fiber_create("flush_all", -1, -1, flush_all, (void *)flush_delay); + struct fiber *f = fiber_create("flush_all", -1, flush_all, (void *)flush_delay); if (f) fiber_call(f); } else diff --git a/mod/box/memcached-grammar.rl b/mod/box/memcached-grammar.rl index 130985e47a..22ac49629f 100644 --- a/mod/box/memcached-grammar.rl +++ b/mod/box/memcached-grammar.rl @@ -207,7 +207,7 @@ memcached_dispatch() action flush_all { if (flush_delay > 0) { - struct fiber *f = fiber_create("flush_all", -1, -1, flush_all, (void *)flush_delay); + struct fiber *f = fiber_create("flush_all", -1, flush_all, (void *)flush_delay); if (f) fiber_call(f); } else diff --git a/mod/box/memcached.m b/mod/box/memcached.m index a0d57a9ab8..e4e230568d 100644 --- a/mod/box/memcached.m +++ b/mod/box/memcached.m @@ -538,7 +538,7 @@ void memcached_start_expire() assert(memcached_expire == NULL); memcached_expire = fiber_create("memcached_expire", -1, - -1, memcached_expire_loop, NULL); + memcached_expire_loop, NULL); if (memcached_expire == NULL) say_error("can't start the expire fiber"); fiber_call(memcached_expire); -- GitLab