diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 201bc1172e09dfbae1e93a036557abf20b4b7381..7b711885b9db53744f733f3d8be92303573596e5 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -185,9 +185,7 @@ struct iproto_thread { * Iproto thread id */ uint32_t id; - /* - * Iproto binary listener - */ + /** Array of iproto binary listeners */ struct evio_service binary; /** Requests count currently pending in stream queue. */ size_t requests_in_stream_queue; @@ -257,14 +255,18 @@ unsigned iproto_readahead = 16320; /* The maximal number of iproto messages in fly. */ static int iproto_msg_max = IPROTO_MSG_MAX_MIN; +int +iproto_addr_count(void) +{ + return evio_service_count(&tx_binary); +} + const char * -iproto_bound_address(char *buf) +iproto_addr_str(char *buf, int idx) { - if (tx_binary.addr_len == 0) - return NULL; - sio_addr_snprintf(buf, SERVICE_NAME_MAXLEN, - (struct sockaddr *)&tx_binary.addrstorage, - tx_binary.addr_len); + socklen_t size; + const struct sockaddr *addr = evio_service_addr(&tx_binary, idx, &size); + sio_addr_snprintf(buf, SERVICE_NAME_MAXLEN, addr, size); return buf; } @@ -2962,17 +2964,6 @@ struct iproto_cfg_msg: public cbus_call_msg struct iproto_thread *iproto_thread; }; -static inline void -iproto_thread_fill_binary(struct iproto_thread *iproto_thread, - struct evio_service *binary) -{ - strcpy(iproto_thread->binary.host, binary->host); - strcpy(iproto_thread->binary.serv, binary->serv); - iproto_thread->binary.addrstorage = binary->addrstorage; - iproto_thread->binary.addr_len = binary->addr_len; - ev_io_set(&iproto_thread->binary.ev, binary->ev.fd, EV_READ); -} - static inline void iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op) { @@ -3004,6 +2995,7 @@ iproto_do_cfg_f(struct cbus_call_msg *m) struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m; int old; struct iproto_thread *iproto_thread = cfg_msg->iproto_thread; + struct evio_service *binary = &iproto_thread->binary; struct errinj *inj; try { @@ -3024,17 +3016,19 @@ iproto_do_cfg_f(struct cbus_call_msg *m) "iproto listen"); diag_raise(); } - if (evio_service_is_active(&iproto_thread->binary)) { + if (evio_service_is_active(binary)) { diag_set(ClientError, ER_UNSUPPORTED, "Iproto", "listen if service already active"); diag_raise(); } - iproto_thread_fill_binary(iproto_thread, cfg_msg->binary); - if (evio_service_listen(&iproto_thread->binary) != 0) + evio_service_create(loop(), binary, "binary", + iproto_on_accept, iproto_thread); + evio_service_attach(binary, cfg_msg->binary); + if (evio_service_listen(binary) != 0) diag_raise(); break; case IPROTO_CFG_STOP: - evio_service_detach(&iproto_thread->binary); + evio_service_detach(binary); break; case IPROTO_CFG_STAT: iproto_fill_stat(iproto_thread, cfg_msg); @@ -3096,17 +3090,14 @@ iproto_listen(const char *uri) iproto_send_stop_msg(); evio_service_stop(&tx_binary); evio_service_create(loop(), &tx_binary, "tx_binary", NULL, NULL); - if (uri == NULL) { - tx_binary.addr_len = 0; - return 0; - } /* - * Please note, we bind socket in main thread, and then - * listen this socket in all iproto threads! With this + * Please note, we bind sockets in main thread, and then + * listen these sockets in all iproto threads! With this * implementation, we rely on the Linux kernel to distribute * incoming connections across iproto threads. */ - if (evio_service_bind(&tx_binary, uri) != 0) + const char *uris[] = { uri }; + if (evio_service_bind(&tx_binary, uris, (uri != NULL ? 1 : 0)) != 0) return -1; if (iproto_send_listen_msg(&tx_binary) != 0) return -1; @@ -3197,8 +3188,8 @@ iproto_free(void) free(iproto_threads); /* - * Here we close socket and unlink unix socket path. - * in case it's unix socket. + * Here we close sockets and unlink all unix socket paths. + * in case it's unix sockets. */ evio_service_stop(&tx_binary); } diff --git a/src/box/iproto.h b/src/box/iproto.h index 8c1d892dc24cb994fcadc9e090943534912c6cea..f76548c9960a3900afa321cdf10899293a02e230 100644 --- a/src/box/iproto.h +++ b/src/box/iproto.h @@ -94,11 +94,18 @@ void iproto_reset_stat(void); /** - * String representation of the address served by - * iproto. To be shown in box.info. + * Return count of the addresses currently served by iproto. + */ +int +iproto_addr_count(void); + +/** + * Return representation of the address served by iproto by + * it's @a idx. @a buf should have at least SERVICE_NAME_MAXLEN + * size. */ const char * -iproto_bound_address(char *buf); +iproto_addr_str(char *buf, int idx); int iproto_rmean_foreach(void *cb, void *cb_ctx); diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 42068cef2e11a886f775e56ab3e7bddab86e02f9..8e02f65940de5a7b53688219464f80a675d864f2 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -592,9 +592,21 @@ lbox_info_sql(struct lua_State *L) static int lbox_info_listen(struct lua_State *L) { - /* NULL is ok, no need to check. */ + int count = iproto_addr_count(); + if (count == 0) { + lua_pushnil(L); + return 1; + } char addrbuf[SERVICE_NAME_MAXLEN]; - lua_pushstring(L, iproto_bound_address(addrbuf)); + if (count == 1) { + lua_pushstring(L, iproto_addr_str(addrbuf, 0)); + return 1; + } + lua_createtable(L, count, 0); + for (int i = 0; i < count; i++) { + lua_pushstring(L, iproto_addr_str(addrbuf, i)); + lua_rawseti(L, -2, i + 1); + } return 1; } diff --git a/src/lib/core/coio.c b/src/lib/core/coio.c index 72e935d46c0b684798e54735fe83be13b8f77d0d..05580bbb6035bd9d0fdfc8b9a7c6046f07a72762 100644 --- a/src/lib/core/coio.c +++ b/src/lib/core/coio.c @@ -507,7 +507,8 @@ coio_service_init(struct coio_service *service, const char *name, int coio_service_start(struct evio_service *service, const char *uri) { - if (evio_service_bind(service, uri) != 0 || + const char *uris[] = { uri }; + if (evio_service_bind(service, uris, 1) != 0 || evio_service_listen(service) != 0) return -1; return 0; diff --git a/src/lib/core/evio.c b/src/lib/core/evio.c index 9eb71e008eb88c2864602349be589077cc58f3d4..790562524a69f765bbb25026e8bbb945052462c8 100644 --- a/src/lib/core/evio.c +++ b/src/lib/core/evio.c @@ -39,6 +39,30 @@ #include <trivia/util.h> #include "exception.h" +struct evio_service_entry { + /** Bind host:service, useful for logging */ + char host[URI_MAXHOST]; + char serv[URI_MAXSERVICE]; + + /** Interface/port to bind to */ + union { + struct sockaddr addr; + struct sockaddr_storage addrstorage; + }; + socklen_t addr_len; + + /** libev io object for the acceptor socket. */ + struct ev_io ev; + /** Pointer to the root evio_service, which contains this object */ + struct evio_service *service; +}; + +static inline bool +evio_service_entry_is_active(const struct evio_service_entry *entry) +{ + return entry->ev.fd >= 0; +} + static int evio_setsockopt_keepalive(int fd) { @@ -140,11 +164,12 @@ evio_service_name(struct evio_service *service) * callback. */ static void -evio_service_accept_cb(ev_loop *loop, ev_io *watcher, int events) +evio_service_entry_accept_cb(ev_loop *loop, ev_io *watcher, int events) { (void) loop; (void) events; - struct evio_service *service = (struct evio_service *) watcher->data; + struct evio_service_entry *entry = + (struct evio_service_entry *)watcher->data; int fd; while (1) { /* @@ -154,7 +179,7 @@ evio_service_accept_cb(ev_loop *loop, ev_io *watcher, int events) */ struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); - fd = sio_accept(service->ev.fd, (struct sockaddr *)&addr, + fd = sio_accept(entry->ev.fd, (struct sockaddr *)&addr, &addrlen); if (fd < 0) { @@ -162,11 +187,12 @@ evio_service_accept_cb(ev_loop *loop, ev_io *watcher, int events) break; return; } - if (evio_setsockopt_client(fd, service->addr.sa_family, + if (evio_setsockopt_client(fd, entry->addr.sa_family, SOCK_STREAM) != 0) break; - if (service->on_accept(service, fd, (struct sockaddr *)&addr, - addrlen) != 0) + if (entry->service->on_accept(entry->service, fd, + (struct sockaddr *)&addr, + addrlen) != 0) break; } if (fd >= 0) @@ -179,7 +205,7 @@ evio_service_accept_cb(ev_loop *loop, ev_io *watcher, int events) * listening on it. Unlink the file if it's the case. */ static int -evio_service_reuse_addr(const char *uri) +evio_service_entry_reuse_addr(const char *uri) { struct uri u; if (uri_parse(&u, uri) || u.service == NULL) { @@ -219,35 +245,37 @@ evio_service_reuse_addr(const char *uri) * Throws an exception if error. */ static int -evio_service_bind_addr(struct evio_service *service) +evio_service_entry_bind_addr(struct evio_service_entry *entry) { - say_debug("%s: binding to %s...", evio_service_name(service), - sio_strfaddr(&service->addr, service->addr_len)); + say_debug("%s: binding to %s...", + evio_service_name(entry->service), + sio_strfaddr(&entry->addr, entry->addr_len)); /* Create a socket. */ - int fd = sio_socket(service->addr.sa_family, + int fd = sio_socket(entry->addr.sa_family, SOCK_STREAM, IPPROTO_TCP); if (fd < 0) return -1; - if (evio_setsockopt_server(fd, service->addr.sa_family, + if (evio_setsockopt_server(fd, entry->addr.sa_family, SOCK_STREAM) != 0) goto error; - if (sio_bind(fd, &service->addr, service->addr_len) != 0) + if (sio_bind(fd, &entry->addr, entry->addr_len) != 0) goto error; /* * After binding a result address may be different. For * example, if a port was 0. */ - if (sio_getsockname(fd, &service->addr, &service->addr_len) != 0) + if (sio_getsockname(fd, &entry->addr, &entry->addr_len) != 0) goto error; - say_info("%s: bound to %s", evio_service_name(service), - sio_strfaddr(&service->addr, service->addr_len)); + say_info("%s: bound to %s", + evio_service_name(entry->service), + sio_strfaddr(&entry->addr, entry->addr_len)); /* Register the socket in the event loop. */ - ev_io_set(&service->ev, fd, EV_READ); + ev_io_set(&entry->ev, fd, EV_READ); return 0; error: close(fd); @@ -259,71 +287,64 @@ evio_service_bind_addr(struct evio_service *service) * * @retval 0 for success */ -int -evio_service_listen(struct evio_service *service) +static int +evio_service_entry_listen(struct evio_service_entry *entry) { - say_debug("%s: listening on %s...", evio_service_name(service), - sio_strfaddr(&service->addr, service->addr_len)); + say_debug("%s: listening on %s...", + evio_service_name(entry->service), + sio_strfaddr(&entry->addr, entry->addr_len)); - int fd = service->ev.fd; + int fd = entry->ev.fd; if (sio_listen(fd)) return -1; - ev_io_start(service->loop, &service->ev); + ev_io_start(entry->service->loop, &entry->ev); return 0; } -void -evio_service_create(ev_loop *loop, struct evio_service *service, - const char *name, evio_accept_f on_accept, - void *on_accept_param) +static void +evio_service_entry_create(struct evio_service_entry *entry, + struct evio_service *service) { - memset(service, 0, sizeof(struct evio_service)); - snprintf(service->name, sizeof(service->name), "%s", name); - - service->loop = loop; - - service->on_accept = on_accept; - service->on_accept_param = on_accept_param; + memset(entry, 0, sizeof(struct evio_service_entry)); /* * Initialize libev objects to be able to detect if they - * are active or not in evio_service_stop(). + * are active or not in evio_service_entry_stop(). */ - ev_init(&service->ev, evio_service_accept_cb); - ev_io_set(&service->ev, -1, 0); - service->ev.data = service; + ev_init(&entry->ev, evio_service_entry_accept_cb); + ev_io_set(&entry->ev, -1, 0); + entry->ev.data = entry; + entry->service = service; } /** * Try to bind. */ -int -evio_service_bind(struct evio_service *service, const char *uri) +static int +evio_service_entry_bind(struct evio_service_entry *entry, const char *uri) { - if (evio_service_reuse_addr(uri) != 0) - return -1; struct uri u; if (uri_parse(&u, uri) || u.service == NULL) { diag_set(IllegalParams, "invalid uri for bind: %s", uri); return -1; } - snprintf(service->serv, sizeof(service->serv), "%.*s", + snprintf(entry->serv, sizeof(entry->serv), "%.*s", (int) u.service_len, u.service); if (u.host != NULL && strncmp(u.host, "*", u.host_len) != 0) { - snprintf(service->host, sizeof(service->host), "%.*s", - (int) u.host_len, u.host); - } /* else { service->host[0] = '\0'; } */ + snprintf(entry->host, sizeof(entry->host), "%.*s", + (int) u.host_len, u.host); + } /* else { entry->host[0] = '\0'; } */ - assert(! ev_is_active(&service->ev)); + assert(! ev_is_active(&entry->ev)); - if (strcmp(service->host, URI_HOST_UNIX) == 0) { + if (strcmp(entry->host, URI_HOST_UNIX) == 0) { /* UNIX domain socket */ - struct sockaddr_un *un = (struct sockaddr_un *) &service->addr; - service->addr_len = sizeof(*un); + struct sockaddr_un *un = (struct sockaddr_un *) &entry->addr; + entry->addr_len = sizeof(*un); snprintf(un->sun_path, sizeof(un->sun_path), "%s", - service->serv); + entry->serv); un->sun_family = AF_UNIX; - return evio_service_bind_addr(service); + return evio_service_entry_bind_addr(entry); } /* IP socket */ @@ -334,59 +355,179 @@ evio_service_bind(struct evio_service *service, const char *uri) hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG; /* make no difference between empty string and NULL for host */ - if (getaddrinfo(*service->host ? service->host : NULL, service->serv, + if (getaddrinfo(*entry->host ? entry->host : NULL, entry->serv, &hints, &res) != 0 || res == NULL) { diag_set(SocketError, sio_socketname(-1), "can't resolve uri for bind"); return -1; } for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) { - memcpy(&service->addr, ai->ai_addr, ai->ai_addrlen); - service->addr_len = ai->ai_addrlen; - if (evio_service_bind_addr(service) == 0) { + memcpy(&entry->addr, ai->ai_addr, ai->ai_addrlen); + entry->addr_len = ai->ai_addrlen; + if (evio_service_entry_bind_addr(entry) == 0) { freeaddrinfo(res); return 0; } say_error("%s: failed to bind on %s: %s", - evio_service_name(service), + evio_service_name(entry->service), sio_strfaddr(ai->ai_addr, ai->ai_addrlen), diag_last_error(diag_get())->errmsg); } freeaddrinfo(res); diag_set(SocketError, sio_socketname(-1), "%s: failed to bind", - evio_service_name(service)); + evio_service_name(entry->service)); return -1; } -/** It's safe to stop a service which is not started yet. */ -void -evio_service_stop(struct evio_service *service) +static void +evio_service_entry_detach(struct evio_service_entry *entry) { - say_info("%s: stopped", evio_service_name(service)); + if (ev_is_active(&entry->ev)) { + ev_io_stop(entry->service->loop, &entry->ev); + entry->addr_len = 0; + } + ev_io_set(&entry->ev, -1, 0); +} - int service_fd = service->ev.fd; - evio_service_detach(service); +/** It's safe to stop a service entry which is not started yet. */ +static void +evio_service_entry_stop(struct evio_service_entry *entry) +{ + int service_fd = entry->ev.fd; + evio_service_entry_detach(entry); if (service_fd < 0) return; if (close(service_fd) < 0) say_error("Failed to close socket: %s", strerror(errno)); - if (service->addr.sa_family != AF_UNIX) + if (entry->addr.sa_family != AF_UNIX) return; - if (unlink(((struct sockaddr_un *)&service->addr)->sun_path) < 0) { + if (unlink(((struct sockaddr_un *)&entry->addr)->sun_path) < 0) { say_error("Failed to unlink unix " "socket path: %s", strerror(errno)); } } +static void +evio_service_entry_attach(struct evio_service_entry *dst, + const struct evio_service_entry *src) +{ + assert(!ev_is_active(&dst->ev)); + strcpy(dst->host, src->host); + strcpy(dst->serv, src->serv); + dst->addrstorage = src->addrstorage; + dst->addr_len = src->addr_len; + ev_io_set(&dst->ev, src->ev.fd, EV_READ); +} + +static inline int +evio_service_reuse_addr(const char **uris, int size) +{ + for (int i = 0; i < size; i++) { + if (evio_service_entry_reuse_addr(uris[i]) != 0) + return -1; + } + return 0; +} + +static void +evio_service_create_entries(struct evio_service *service, int size) +{ + service->entry_count = size; + service->entries = (size != 0 ? + xmalloc(size *sizeof(struct evio_service_entry)) : NULL); + for (int i = 0; i < service->entry_count; i++) + evio_service_entry_create(&service->entries[i], service); +} + +int +evio_service_count(const struct evio_service *service) +{ + return service->entry_count; +} + +const struct sockaddr * +evio_service_addr(const struct evio_service *service, int idx, socklen_t *size) +{ + assert(idx < service->entry_count); + const struct evio_service_entry *e = &service->entries[idx]; + *size = e->addr_len; + return &e->addr; +} + +void +evio_service_create(struct ev_loop *loop, struct evio_service *service, + const char *name, evio_accept_f on_accept, + void *on_accept_param) +{ + memset(service, 0, sizeof(struct evio_service)); + snprintf(service->name, sizeof(service->name), "%s", name); + service->loop = loop; + service->on_accept = on_accept; + service->on_accept_param = on_accept_param; +} + +void +evio_service_attach(struct evio_service *dst, const struct evio_service *src) +{ + assert(dst->entry_count == 0); + evio_service_create_entries(dst, src->entry_count); + for (int i = 0; i < src->entry_count; i++) + evio_service_entry_attach(&dst->entries[i], &src->entries[i]); +} + void evio_service_detach(struct evio_service *service) { - if (ev_is_active(&service->ev)) { - ev_io_stop(service->loop, &service->ev); - service->addr_len = 0; + for (int i = 0; i < service->entry_count; i++) + evio_service_entry_detach(&service->entries[i]); + free(service->entries); + service->entry_count = 0; + service->entries = NULL; +} + +bool +evio_service_is_active(const struct evio_service *service) +{ + for (int i = 0; i < service->entry_count; i++) { + if (evio_service_entry_is_active(&service->entries[i])) + return true; } - ev_io_set(&service->ev, -1, 0); + return false; +} + +int +evio_service_listen(struct evio_service *service) +{ + for (int i = 0; i < service->entry_count; i++) { + if (evio_service_entry_listen(&service->entries[i]) != 0) + return -1; + } + return 0; +} + +void +evio_service_stop(struct evio_service *service) +{ + say_info("%s: stopped", evio_service_name(service)); + for (int i = 0; i < service->entry_count; i++) + evio_service_entry_stop(&service->entries[i]); + free(service->entries); + service->entry_count = 0; + service->entries = NULL; +} + +int +evio_service_bind(struct evio_service *service, const char **uris, int size) +{ + if (evio_service_reuse_addr(uris, size) != 0) + return -1; + evio_service_create_entries(service, size); + for (int i = 0; i < size; i++) { + if (evio_service_entry_bind(&service->entries[i], uris[i]) != 0) + return -1; + } + return 0; } diff --git a/src/lib/core/evio.h b/src/lib/core/evio.h index 93693b0efbb2e1857e4613e8e485b7837991ddfb..862afcdcee96987b2c72acb2249758b7ec2bcbf2 100644 --- a/src/lib/core/evio.h +++ b/src/lib/core/evio.h @@ -55,60 +55,58 @@ extern "C" { * a fiber and use coio.h (cooperative multi-tasking I/O)) API. * * How to use a service: - * struct evio_service *service; - * service = malloc(sizeof(struct evio_service)); - * evio_service_create(service, ..., on_accept_cb, ...); - * evio_service_bind(service); - * evio_service_listen(service); + * struct evio_service service; + * evio_service_create(loop(), &service, ..., on_accept_cb, ...); + * evio_service_bind(&service); + * evio_service_listen(&service); * ... - * evio_service_stop(service); - * free(service); - * - * If a service is not started, but only initialized, no - * dedicated cleanup/destruction is necessary. + * evio_service_stop(&service); */ +struct evio_service_entry; struct evio_service; typedef int (*evio_accept_f)(struct evio_service *, int, struct sockaddr *, - socklen_t); - -struct evio_service -{ - /** Service name. E.g. 'primary', 'secondary', etc. */ - char name[SERVICE_NAME_MAXLEN]; - /** Bind host:service, useful for logging */ - char host[URI_MAXHOST]; - char serv[URI_MAXSERVICE]; - - /** Interface/port to bind to */ - union { - struct sockaddr addr; - struct sockaddr_storage addrstorage; - }; - socklen_t addr_len; - - /** - * A callback invoked on every accepted client socket. - * If a callback returned != 0, the accepted socket is - * closed and the error is logged. - */ - evio_accept_f on_accept; - void *on_accept_param; - - /** libev io object for the acceptor socket. */ - struct ev_io ev; - ev_loop *loop; + socklen_t); + +struct evio_service { + /** Total count of services */ + int entry_count; + /** Array of structures that encapsulate work with sockets */ + struct evio_service_entry *entries; + /** Service name. E.g. 'primary', 'secondary', etc. */ + char name[SERVICE_NAME_MAXLEN]; + /** + * A callback invoked on every accepted client socket. + * If a callback returned != 0, the accepted socket is + * closed and the error is logged. + */ + evio_accept_f on_accept; + void *on_accept_param; + ev_loop *loop; }; +/** + * Return count of service entries in @a service + */ +int +evio_service_count(const struct evio_service *service); + +/** + * Return struct which represent address served by entry with + * @a idx index in @a service. @a size contains structure length. + */ +const struct sockaddr * +evio_service_addr(const struct evio_service *service, int idx, socklen_t *size); + /** Initialize the service. Don't bind to the port yet. */ void -evio_service_create(ev_loop *loop, struct evio_service *service, +evio_service_create(struct ev_loop *loop, struct evio_service *service, const char *name, evio_accept_f on_accept, void *on_accept_param); /** Bind service to specified uri */ int -evio_service_bind(struct evio_service *service, const char *uri); +evio_service_bind(struct evio_service *service, const char **uris, int size); /** * Listen on bounded socket @@ -126,11 +124,14 @@ evio_service_detach(struct evio_service *service); void evio_service_stop(struct evio_service *service); -static inline bool -evio_service_is_active(struct evio_service *service) -{ - return service->ev.fd >= 0; -} +/** + * Updates @a dst evio_service socket settings according @a src evio service. + */ +void +evio_service_attach(struct evio_service *dst, const struct evio_service *src); + +bool +evio_service_is_active(const struct evio_service *service); static inline void evio_timeout_init(ev_loop *loop, ev_tstamp *start, ev_tstamp *delay,