diff --git a/include/evio.h b/include/evio.h new file mode 100644 index 0000000000000000000000000000000000000000..7f12c7f2ec2e18fe46dafc9657d8bc24068b5b3f --- /dev/null +++ b/include/evio.h @@ -0,0 +1,115 @@ +#ifndef TARANTOOL_EVIO_H_INCLUDED +#define TARANTOOL_EVIO_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +/** + * Asynchronous IO in libev event loop. + * Requires a running libev loop. + */ +#include <stdbool.h> +#include "tarantool_ev.h" +#include "sio.h" +/** + * Exception-aware way to add a listening socket to the event + * loop. Callbacks are invoked on bind and accept events. + * + * Coroutines/fibers are not used for port listeners + * since listener's job is usually simple and only involves + * creating a session for the accepted socket. The session itself + * can be built around simple libev callbacks, or around + * cooperative multitasking (on_accept callback can create + * a fiber and use coio.h (cooperative multi-tasking I/O)) API. + * + * How to use a service: + * struct evio_service *service; + * service = malloc(sizeof(struct evio_service)); + * evio_service_init(service, ..., on_accept_cb, ...); + * evio_service_start(service); + * ... + * evio_service_stop(service); + * free(service); + * + * If a service is not started, but only initialized, no + * dedicated cleanup/destruction is necessary. + */ +struct evio_service +{ + /** Service name. E.g. 'primary', 'secondary', etc. */ + char name[SERVICE_NAME_MAXLEN]; + + /** Interface/port to bind to */ + struct sockaddr_in addr; + + /** A callback invoked upon a successful bind, optional. + * If on_bind callback throws an exception, it's + * service start is aborted, and exception is re-thrown. + */ + void (*on_bind)(void *); + void *on_bind_param; + /** + * A callback invoked on every accepted client socket. + * It's OK to throw an exception in the callback: + * when it happens, the exception is logged, and the + * accepted socket is closed. + */ + void (*on_accept)(void *, int, struct sockaddr_in *); + void *on_accept_param; + + /** libev timer object for the bind retry delay. */ + struct ev_timer timer; + /** libev io object for the acceptor socket. */ + struct ev_io ev; +}; + +/** Initialize the service. Don't bind to the port yet. */ +void +evio_service_init(struct evio_service *service, const char *name, + const char *host, int port, + void (*on_accept)(void *, int, struct sockaddr_in *), + void *on_accept_param); + +/** Set an optional callback to be invoked upon a successful bind. */ +static inline void +evio_service_on_bind(struct evio_service *service, + void (*on_bind)(void *), + void *on_bind_param) +{ + service->on_bind = on_bind; + service->on_bind_param = on_bind_param; +} + +/** Bind to the port and begin listening. */ +void +evio_service_start(struct evio_service *service); + +/** If started, stop event flow and close the acceptor socket. */ +void +evio_service_stop(struct evio_service *service); + +#endif /* TARANTOOL_EVIO_H_INCLUDED */ diff --git a/include/sio.h b/include/sio.h new file mode 100644 index 0000000000000000000000000000000000000000..df79d177185d88779a78acb35c405393200d5259 --- /dev/null +++ b/include/sio.h @@ -0,0 +1,90 @@ +#ifndef TARANTOOL_SIO_H_INCLUDED +#define TARANTOOL_SIO_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +/** + * Exception-aware wrappers around BSD sockets. + * Provide better error logging and I/O statistics. + */ +#include <stdbool.h> +#include <netinet/in.h> +#include <fcntl.h> +#include <netinet/tcp.h> /* TCP_NODELAY */ +#include "exception.h" + +enum { SERVICE_NAME_MAXLEN = 32 }; + +@interface SocketError: SystemError +- (id) init: (int) fd in: (const char *) format, ...; +@end + +int sio_socket(void); + +int sio_getfl(int fd); +int sio_setfl(int fd, int flag, int on); + +void +sio_setsockopt(int fd, int level, int optname, + const void *optval, socklen_t optlen); +void +sio_getsockopt(int fd, int level, int optname, + void *optval, socklen_t *optlen); + +int sio_connect(int fd, struct sockaddr_in *addr, socklen_t addrlen); +int sio_bind(int fd, struct sockaddr_in *addr, socklen_t addrlen); +int sio_listen(int fd); +int sio_accept(int fd, struct sockaddr_in *addr, socklen_t *addrlen); + +ssize_t sio_read(int fd, void *buf, size_t count); +ssize_t sio_write(int fd, const void *buf, size_t count); +ssize_t sio_writev(int fd, const struct iovec *iov, int iovcnt); + +int sio_getpeername(int fd, struct sockaddr_in *addr); +const char *sio_strfaddr(struct sockaddr_in *addr); + +static inline struct iovec * +sio_advance_iov(struct iovec *iov, int *iovcnt, ssize_t nwr) +{ + struct iovec *end = iov + *iovcnt; + while (iov < end) { + if (nwr >= iov->iov_len) { + nwr -= iov->iov_len; + iov++; + } else { + iov->iov_base += nwr; + iov->iov_len -= nwr; + break; + } + } + *iovcnt = end - iov; + return iov; +} + + +#endif /* TARANTOOL_SIO_H_INCLUDED */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e95a274392f57969f83f37e4544913548d980f92..91f012a4187137bcead4ce22cecf3ce0a3d1f64e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -83,6 +83,8 @@ set (common_sources tbuf.m palloc.m util.m + sio.m + evio.m salloc.m pickle.m coro.m diff --git a/src/evio.m b/src/evio.m new file mode 100644 index 0000000000000000000000000000000000000000..891f516b6a1ee50fcb2aba204ecbba4b4c2fd49d --- /dev/null +++ b/src/evio.m @@ -0,0 +1,215 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "evio.h" +#include <stdio.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#define BIND_RETRY_DELAY 0.1 + +static inline int +evio_service_port(struct evio_service *service) +{ + return ntohs(service->addr.sin_port); +} + +/** + * A callback invoked by libev when acceptor socket is ready. + * Accept the socket, initialize it and pass to the on_accept + * callback. + */ +static void +evio_service_accept_cb(ev_io *watcher, + int revents __attribute__((unused))) +{ + struct evio_service *service = watcher->data; + int fd = -1; + + @try { + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + fd = sio_accept(service->ev.fd, &addr, &addrlen); + + if (fd < 0) /* EAGAIN, EWOULDLOCK, EINTR */ + return; + + int on = 1; + /* libev is non-blocking */ + sio_setfl(fd, O_NONBLOCK, on); + /* SO_KEEPALIVE to ensure connections don't hang + * around for too long when a link goes away + */ + sio_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, + &on, sizeof(on)); + /* + * Lower latency is more important than higher + * bandwidth, and we usually write entire + * request/response in a single syscall. + */ + sio_setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, + &on, sizeof(on)); + /* + * Invoke the callback and pass it the accepted + * socket. + */ + service->on_accept(service->on_accept_param, fd, &addr); + + } @catch (tnt_Exception *e) { + if (fd >= 0) + close(fd); + [e log]; + } +} + +/** Try to bind and listen on the configured port. + * + * Throws an exception if error. + * Returns -1 if the address is already in use, and one + * needs to retry binding. + */ +static int +evio_service_bind_and_listen(struct evio_service *service) +{ + /* Create a socket. */ + int fd = sio_socket(); + + @try { + int on = 1; + /* Set appropriate options. */ + sio_setfl(fd, O_NONBLOCK, on); + + sio_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + &on, sizeof(on)); + sio_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, + &on, sizeof(on)); + + struct linger linger = { 0, 0 }; + + sio_setsockopt(fd, SOL_SOCKET, SO_LINGER, + &linger, sizeof(linger)); + + if (sio_bind(fd, &service->addr, sizeof(service->addr)) || + sio_listen(fd)) { + assert(errno == EADDRINUSE); + close(fd); + return -1; + } + say_info("bound to port %i", evio_service_port(service)); + + /* Invoke on_bind callback if it is set. */ + if (service->on_bind) + service->on_bind(service->on_bind_param); + + } @catch (tnt_Exception *e) { + close(fd); + @throw; + } + /* Register the socket in the event loop. */ + ev_io_set(&service->ev, fd, EV_READ); + ev_io_start(&service->ev); + return 0; +} + +/** A callback invoked by libev when sleep timer expires. + * + * Retry binding. On success, stop the timer. If the port + * is still in use, pause again. + */ +static void +evio_service_timer_cb(ev_timer *watcher, int revents __attribute__((unused))) +{ + struct evio_service *service = watcher->data; + assert(! ev_is_active(&service->ev)); + + if (evio_service_bind_and_listen(service) == 0) + ev_timer_stop(watcher); +} + +void +evio_service_init(struct evio_service *service, const char *name, + const char *host, int port, + void (*on_accept)(void *, int, struct sockaddr_in *), + void *on_accept_param) +{ + memset(service, 0, sizeof(struct evio_service)); + snprintf(service->name, sizeof(service->name), "%s", name); + + service->addr.sin_family = AF_INET; + service->addr.sin_port = htons(port); + if (strcmp(host, "INADDR_ANY") == 0) { + service->addr.sin_addr.s_addr = INADDR_ANY; + } else if (inet_aton(host, &service->addr.sin_addr) == 0) { + tnt_raise(SocketError, :"invalid address for bind: %s", + host); + } + service->on_accept = on_accept; + service->on_accept_param = on_accept_param; + /* + * Initialize libev objects to be able to detect if they + * are active or not in evio_service_stop(). + */ + ev_init(&service->ev, evio_service_accept_cb); + ev_init(&service->timer, evio_service_timer_cb); + service->timer.data = service->ev.data = service; +} + +/** + * Try to bind and listen. If the port is in use, + * say a warning, and start the timer which will retry + * binding periodically. + */ +void +evio_service_start(struct evio_service *service) +{ + assert(! ev_is_active(&service->ev)); + + if (evio_service_bind_and_listen(service)) { + /* Try again after a delay. */ + say_warn("port %i is already in use, will " + "retry binding after %lf seconds.", + evio_service_port(service), BIND_RETRY_DELAY); + + ev_timer_set(&service->timer, + BIND_RETRY_DELAY, BIND_RETRY_DELAY); + ev_timer_start(&service->timer); + } +} + +/** It's safe to stop a service which is not started yet. */ +void +evio_service_stop(struct evio_service *service) +{ + if (! ev_is_active(&service->ev)) { + ev_timer_stop(&service->timer); + } else { + ev_io_stop(&service->ev); + close(service->ev.fd); + } +} diff --git a/src/exception.m b/src/exception.m index 7456dbee25ad6b4fb770a63820f75c0a12d8964a..e233b8a433cb22ca8e904906bf9169f03b282706 100644 --- a/src/exception.m +++ b/src/exception.m @@ -68,7 +68,7 @@ { va_list ap; va_start(ap, format); - [self init: errno :format :ap]; + self = [self init: errno :format :ap]; va_end(ap); return self; } diff --git a/src/replication.m b/src/replication.m index 8884b29bb0068c8a6f30f40d1a470fe264bf29c1..6c2fc086379ab563ba053b8d44052c4fcb3aa66c 100644 --- a/src/replication.m +++ b/src/replication.m @@ -46,6 +46,7 @@ #include "fiber.h" #include "recovery.h" #include "log_io.h" +#include "evio.h" /** Replication topology * ---------------------- @@ -61,8 +62,8 @@ * incoming connections. This is done in the master to be able to * correctly handle RELOAD CONFIGURATION, which happens in the * master, and, in future, perform authentication of replication - * clients. Since the master uses fibers to serve all clients, - * replication acceptor fiber is just one of many fibers in use. + * clients. + * * Once a client socket is accepted, it is sent to the spawner * process, through the master's end of the socket pair. * @@ -74,18 +75,21 @@ * The spawner then reads EOF from its end, terminates all * children and exits. */ -static int master_to_spawner_sock; +static int master_to_spawner_socket; -/** replication_port acceptor fiber */ +/** Accept a new connection on the replication port: push the accepted socket + * to the spawner. + */ static void -acceptor_handler(void *data __attribute__((unused))); +replication_on_accept(void *data __attribute__((unused)), + int fd, struct sockaddr_in *addr __attribute__((unused))); /** Send a file descriptor to replication relay spawner. * - * @param client_sock the file descriptor to be sent. + * Invoked when spawner's end of the socketpair becomes ready. */ static void -acceptor_send_sock(int client_sock); +replication_send_socket(ev_io *watcher, int events __attribute__((unused))); /** Replication spawner process */ static struct spawner { @@ -179,9 +183,8 @@ replication_prefork() if (pid != 0) { /* parent process: tarantool */ close(sockpair[1]); - master_to_spawner_sock = sockpair[0]; - if (set_nonblock(master_to_spawner_sock) == -1) - panic("set_nonblock"); + master_to_spawner_socket = sockpair[0]; + sio_setfl(master_to_spawner_socket, O_NONBLOCK, 1); } else { ev_default_fork(); ev_loop(EVLOOP_NONBLOCK); @@ -207,28 +210,12 @@ replication_init() if (cfg.replication_port == 0) return; /* replication is not in use */ - char fiber_name[FIBER_NAME_MAXLEN]; - - /* create acceptor fiber */ - snprintf(fiber_name, FIBER_NAME_MAXLEN, "%i/replication", cfg.replication_port); - - struct fiber *acceptor = fiber_create(fiber_name, -1, acceptor_handler, NULL); - - if (acceptor == NULL) { - panic("create fiber fail"); - } + static struct evio_service replication; - fiber_call(acceptor); -} + evio_service_init(&replication, "replication", cfg.bind_ipaddr, + cfg.replication_port, replication_on_accept, NULL); -int sock_set_blocking(int sock) -{ - int flags = fcntl(sock, F_GETFL, 0); - if (flags >= 0 && flags & O_NONBLOCK) - flags = fcntl(sock, F_SETFL, flags & ~O_NONBLOCK); - if (flags < 0) - say_syserror("fcntl"); - return flags; + evio_service_start(&replication); } @@ -238,55 +225,32 @@ int sock_set_blocking(int sock) /** Replication acceptor fiber handler. */ static void -acceptor_handler(void *data __attribute__((unused))) +replication_on_accept(void *data __attribute__((unused)), + int fd, struct sockaddr_in *addr __attribute__((unused))) { - if (fiber_serv_socket(fiber, cfg.replication_port, true, 0.1) != 0) { - panic("can not bind to replication port"); - } + /* + * Drop the O_NONBLOCK flag, which was possibly + * inherited from the acceptor fd (happens on + * Darwin). + */ + sio_setfl(fd, O_NONBLOCK, 0); - for (;;) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(addr); - int client_sock = -1; - - /* wait new connection request */ - fiber_io_start(fiber->fd, EV_READ); - fiber_io_yield(); - - /* accept connection */ - client_sock = accept(fiber->fd, (struct sockaddr*)&addr, - &addrlen); - if (client_sock == -1) { - if (errno == EAGAIN && errno == EWOULDBLOCK) { - continue; - } - panic_syserror("accept"); - } - /* - * Drop the O_NONBLOCK flag, which was possible - * inherited from the accept fd (happens on - * Darwin). - */ - sock_set_blocking(client_sock); - /* up SO_KEEPALIVE flag */ - int keepalive = 1; - if (setsockopt(client_sock, SOL_SOCKET, SO_KEEPALIVE, - &keepalive, sizeof(int)) < 0) - /* just print error, it's not critical error */ - say_syserror("setsockopt()"); - - fiber_io_stop(fiber->fd, EV_READ); - say_info("connection from %s:%d", inet_ntoa(addr.sin_addr), - ntohs(addr.sin_port)); - acceptor_send_sock(client_sock); + struct ev_io *io = malloc(sizeof(struct ev_io)); + if (io == NULL) { + close(fd); + return; } + io->data = (void *) (intptr_t) fd; + ev_io_init(io, replication_send_socket, master_to_spawner_socket, EV_WRITE); + ev_io_start(io); } /** Send a file descriptor to the spawner. */ static void -acceptor_send_sock(int client_sock) +replication_send_socket(ev_io *watcher, int events __attribute__((unused))) { + int client_sock = (intptr_t) watcher->data; struct msghdr msg; struct iovec iov[1]; char control_buf[CMSG_SPACE(sizeof(int))]; @@ -311,15 +275,13 @@ acceptor_send_sock(int client_sock) control_message->cmsg_type = SCM_RIGHTS; *((int *) CMSG_DATA(control_message)) = client_sock; - /* wait, when interprocess comm. socket is ready for write */ - fiber_io_start(master_to_spawner_sock, EV_WRITE); - fiber_io_yield(); - /* send client socket to the spawner */ - if (sendmsg(master_to_spawner_sock, &msg, 0) < 0) + /* Send the client socket to the spawner. */ + if (sendmsg(master_to_spawner_socket, &msg, 0) < 0) say_syserror("sendmsg"); - fiber_io_stop(master_to_spawner_sock, EV_WRITE); - /* close client socket in the main process */ + ev_io_stop(watcher); + free(watcher); + /* Close client socket in the main process. */ close(client_sock); } @@ -572,16 +534,16 @@ retry: static void replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents) { - int fd = *((int *)w->data); + int client_sock = (int) (intptr_t) w->data; u8 data; - int result = recv(fd, &data, sizeof(data), 0); + int rc = recv(client_sock, &data, sizeof(data), 0); - if (result == 0 || (result < 0 && errno == ECONNRESET)) { + if (rc == 0 || (rc < 0 && errno == ECONNRESET)) { say_info("the client has closed its replication socket, exiting"); exit(EXIT_SUCCESS); } - if (result < 0) + if (rc < 0) say_syserror("recv"); exit(EXIT_FAILURE); @@ -590,12 +552,13 @@ replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents) /** Send a single row to the client. */ static int -replication_relay_send_row(void *param __attribute__((unused)), struct tbuf *t) +replication_relay_send_row(void *param, struct tbuf *t) { + int client_sock = (int) (intptr_t) param; u8 *data = t->data; ssize_t bytes, len = t->size; while (len > 0) { - bytes = write(fiber->fd, data, len); + bytes = write(client_sock, data, len); if (bytes < 0) { if (errno == EPIPE) { /* socket closed on opposite site */ @@ -625,12 +588,14 @@ replication_relay_loop(int client_sock) i64 lsn; ssize_t r; - fiber->has_peer = true; - fiber->fd = client_sock; - - /* set process title and fiber name */ - memset(name, 0, sizeof(name)); - snprintf(name, sizeof(name), "relay/%s", fiber_peer_name(fiber)); + /* Set process title and fiber name. + * Even though we use only the main fiber, the logger + * uses the current fiber name. + */ + struct sockaddr_in peer; + socklen_t addrlen = sizeof(peer); + getpeername(client_sock, &peer, &addrlen); + snprintf(name, sizeof(name), "relay/%s", sio_strfaddr(&peer)); fiber_set_name(fiber, name); set_proc_title("%s%s", name, custom_proc_title); @@ -651,7 +616,7 @@ replication_relay_loop(int client_sock) if (sigaction(SIGPIPE, &sa, NULL) == -1) say_syserror("sigaction"); - r = read(fiber->fd, &lsn, sizeof(lsn)); + r = read(client_sock, &lsn, sizeof(lsn)); if (r != sizeof(lsn)) { if (r < 0) { panic_syserror("read"); @@ -662,21 +627,24 @@ replication_relay_loop(int client_sock) ver = tbuf_alloc(fiber->gc_pool); tbuf_append(ver, &default_version, sizeof(default_version)); - replication_relay_send_row(NULL, ver); + replication_relay_send_row((void *)(intptr_t) client_sock, ver); /* init libev events handlers */ ev_default_loop(0); - /* init read events */ + /* + * Init a read event: when replica closes its end + * of the socket, we can read EOF and shutdown the + * relay. + */ struct ev_io sock_read_ev; - int sock_read_fd = fiber->fd; - sock_read_ev.data = (void *)&sock_read_fd; - ev_io_init(&sock_read_ev, replication_relay_recv, sock_read_fd, EV_READ); + sock_read_ev.data = (void *)(intptr_t) client_sock; + ev_io_init(&sock_read_ev, replication_relay_recv, client_sock, EV_READ); ev_io_start(&sock_read_ev); /* Initialize the recovery process */ recovery_init(cfg.snap_dir, cfg.wal_dir, - replication_relay_send_row, NULL, + replication_relay_send_row, (void *)(intptr_t) client_sock, INT32_MAX, "fsync_delay", 0, RECOVER_READONLY); /* diff --git a/src/sio.m b/src/sio.m new file mode 100644 index 0000000000000000000000000000000000000000..f548dc82e0efedd05b3d162730f8904ed593871c --- /dev/null +++ b/src/sio.m @@ -0,0 +1,279 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "sio.h" + +#include <errno.h> +#include <stdio.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> /* inet_ntoa */ + +#include "say.h" + +/** Pretty print socket name and peer (for exceptions) */ +static const char * +sio_socketname(int fd) +{ + static __thread char name[2 * SERVICE_NAME_MAXLEN]; + int n = snprintf(name, sizeof(name), "%d", fd); + if (fd >= 0) { + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int rc = getsockname(fd, (struct sockaddr *) &addr, &addrlen); + if (rc == 0) { + n += snprintf(name + n, sizeof(name) - n, + ", aka %s", sio_strfaddr(&addr)); + } + addrlen = sizeof(addr); + rc = getpeername(fd, (struct sockaddr *) &addr, &addrlen); + if (rc == 0) { + n += snprintf(name + n, sizeof(name) - n, + ", peer of %s", sio_strfaddr(&addr)); + } + } + return name; +} + + +@implementation SocketError +- (id) init: (int) fd in: (const char *) format, ... +{ + int save_errno = errno; + char buf[TNT_ERRMSG_MAX]; + va_list ap; + va_start(ap, format); + vsnprintf(buf, sizeof(buf), format, ap); + va_end(ap); + const char *socketname = sio_socketname(fd); + errno = save_errno; + self = [self init: "in %s, called on %s", buf, socketname]; + return self; +} +@end + +/** Get a string representation of a socket option name, + * for logging. + */ +static const char * +sio_option_name(int option) +{ +#define CASE_OPTION(opt) case opt: return #opt + switch (option) { + CASE_OPTION(SO_KEEPALIVE); + CASE_OPTION(SO_LINGER); + CASE_OPTION(SO_ERROR); + CASE_OPTION(SO_REUSEADDR); + CASE_OPTION(TCP_NODELAY); + default: + return "undefined"; + } +#undef CASE_OPTION +} + +/** Try to automatically configure a listen backlog. + * On Linux, use the system setting, which defaults + * to 128. This way a system administrator can tune + * the backlog as needed. On other systems, use SOMAXCONN. + */ +static int +sio_listen_backlog() +{ +#ifdef TARGET_OS_LINUX + FILE *proc = fopen("/proc/sys/net/core/somaxconn", "r"); + if (proc) { + int backlog; + int rc = fscanf(proc, "%d", &backlog); + fclose(proc); + if (rc == 1) + return backlog; + } +#endif /* TARGET_OS_LINUX */ + return SOMAXCONN; +} + +/** Create a TCP socket. */ +int +sio_socket(void) +{ + int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (fd < 0) + tnt_raise(SocketError, :fd in:"socket"); + return fd; +} + +/** Get socket flags, raise an exception if error. */ +int +sio_getfl(int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) + tnt_raise(SocketError, :fd in:"fcntl(..., F_GETFL, ...)"); + return flags; +} + +/** Set socket flags, raise an exception if error. */ +int +sio_setfl(int fd, int flag, int on) +{ + int flags = sio_getfl(fd); + flags = fcntl(fd, F_SETFL, on ? flags | flag : flags & ~flag); + if (flags < 0) + tnt_raise(SocketError, :fd in:"fcntl(..., F_SETFL, ...)"); + return flags; +} + +/** Set an option on a socket. */ +void +sio_setsockopt(int fd, int level, int optname, + const void *optval, socklen_t optlen) +{ + int rc = setsockopt(fd, level, optname, optval, optlen); + if (rc) { + tnt_raise(SocketError, :fd in:"setsockopt(%s)", + sio_option_name(optname)); + } +} + +/** Get a socket option value. */ +void +sio_getsockopt(int fd, int level, int optname, + void *optval, socklen_t *optlen) +{ + int rc = getsockopt(fd, level, optname, optval, optlen); + if (rc) { + tnt_raise(SocketError, :fd in:"getsockopt(%s)", + sio_option_name(optname)); + } +} + +/** Connect a client socket to a server. */ +int +sio_connect(int fd, struct sockaddr_in *addr, socklen_t addrlen) +{ + /* Establish the connection. */ + int rc = connect(fd, (struct sockaddr *) addr, addrlen); + if (rc < 0 && errno != EINPROGRESS) + tnt_raise(SocketError, :fd in:"connect"); + return rc; +} + +/** Bind a socket to the given address. */ +int +sio_bind(int fd, struct sockaddr_in *addr, socklen_t addrlen) +{ + int rc = bind(fd, (struct sockaddr *) addr, addrlen); + if (rc < 0 && errno != EADDRINUSE) + tnt_raise(SocketError, :fd in:"bind"); + return rc; +} + +/** Mark a socket as accepting connections. */ +int +sio_listen(int fd) +{ + int rc = listen(fd, sio_listen_backlog()); + if (rc < 0 && errno != EADDRINUSE) + tnt_raise(SocketError, :fd in:"listen"); + return rc; +} + +/** Accept a client connection on a server socket. */ +int +sio_accept(int fd, struct sockaddr_in *addr, socklen_t *addrlen) +{ + /* Accept a connection. */ + int newfd = accept(fd, (struct sockaddr *) addr, addrlen); + if (newfd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) + tnt_raise(SocketError, :fd in:"accept"); + return newfd; +} + +/** Read up to 'count' bytes from a socket. */ +ssize_t +sio_read(int fd, void *buf, size_t count) +{ + ssize_t n = read(fd, buf, count); + if (n < 0 && errno != EAGAIN && + errno != EWOULDBLOCK && errno != EINTR) + tnt_raise(SocketError, :fd in:"read(%zd)", count); + return n; +} + +/** Write up to 'count' bytes to a socket. */ +ssize_t +sio_write(int fd, const void *buf, size_t count) +{ + ssize_t n = write(fd, buf, count); + if (n < 0 && errno != EAGAIN && + errno != EWOULDBLOCK && errno != EINTR) + tnt_raise(SocketError, :fd in:"write(%zd)", count); + return n; +} + +/** Write to a socket with iovec. */ +ssize_t +sio_writev(int fd, const struct iovec *iov, int iovcnt) +{ + int cnt = iovcnt < IOV_MAX ? iovcnt : IOV_MAX; + ssize_t n = writev(fd, iov, cnt); + if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK && + errno != EINTR) { + tnt_raise(SocketError, :fd in:"writev(%d)", iovcnt); + } + return n; +} + +/** Get socket peer name. */ +int +sio_getpeername(int fd, struct sockaddr_in *addr) +{ + socklen_t addrlen = sizeof(struct sockaddr_in); + if (getpeername(fd, (struct sockaddr *) addr, &addrlen) < 0) { + say_syserror("getpeername"); + return -1; + } + /* XXX: I've no idea where this is copy-pasted from. */ + if (addr->sin_addr.s_addr == 0) { + say_syserror("getpeername: empty peer"); + return -1; + } + return 0; +} + +/** Pretty print a peer address. */ +const char * +sio_strfaddr(struct sockaddr_in *addr) +{ + static __thread char name[SERVICE_NAME_MAXLEN]; + snprintf(name, sizeof(name), "%s:%d", + inet_ntoa(addr->sin_addr), ntohs(addr->sin_port)); + return name; +} + diff --git a/src/tarantool.m b/src/tarantool.m index 135c036129af785dceb8c2ac836df9b355c6e012..2c29e568b33d3c13ea1da8860d2a155436945272 100644 --- a/src/tarantool.m +++ b/src/tarantool.m @@ -801,33 +801,36 @@ main(int argc, char **argv) atexit(tarantool_free); ev_default_loop(EVFLAG_AUTO); - initialize(cfg.slab_alloc_arena, cfg.slab_alloc_minimal, cfg.slab_alloc_factor); replication_prefork(); signal_init(); - tarantool_L = tarantool_lua_init(); - mod_init(); - tarantool_lua_load_cfg(tarantool_L, &cfg); - admin_init(); - replication_init(); - /* - * Load user init script. The script should have access - * to Tarantool Lua API (box.cfg, box.fiber, etc...) that - * is why script must run only after the server was fully - * initialized. - */ - tarantool_lua_load_init_script(tarantool_L); - - prelease(fiber->gc_pool); - say_crit("log level %i", cfg.log_level); - say_crit("entering event loop"); - if (cfg.io_collect_interval > 0) - ev_set_io_collect_interval(cfg.io_collect_interval); - ev_now_update(); - start_time = ev_now(); - ev_loop(0); + + @try { + tarantool_L = tarantool_lua_init(); + mod_init(); + tarantool_lua_load_cfg(tarantool_L, &cfg); + admin_init(); + replication_init(); + /* + * Load user init script. The script should have access + * to Tarantool Lua API (box.cfg, box.fiber, etc...) that + * is why script must run only after the server was fully + * initialized. + */ + tarantool_lua_load_init_script(tarantool_L); + prelease(fiber->gc_pool); + say_crit("log level %i", cfg.log_level); + say_crit("entering event loop"); + if (cfg.io_collect_interval > 0) + ev_set_io_collect_interval(cfg.io_collect_interval); + ev_now_update(); + start_time = ev_now(); + ev_loop(0); + } @catch (tnt_Exception *e) { + panic("%s", [e errmsg]); + } say_crit("exiting loop"); /* freeing resources */ return 0;