From 2ee7590aab38c30dcc4242f63f8dfde3d82bdb6f Mon Sep 17 00:00:00 2001 From: Alexandr <a.lyapunov@corp.mail.ru> Date: Mon, 30 Sep 2013 18:41:30 +0400 Subject: [PATCH] fixes according to CR --- CMakeLists.txt | 8 ++--- cfg/tarantool_box_cfg.c | 30 ++++++++++++++++++ cfg/tarantool_box_cfg.h | 6 ++++ include/recovery.h | 9 ++++++ include/sio.h | 44 +++++++++++++------------- src/CMakeLists.txt | 1 + src/box/box.cc | 15 +++++++-- src/recovery.cc | 68 +++++++++++++++++++++++++++++------------ src/replica.1.5.cc | 28 +++++++++-------- src/replica.cc | 22 ++++++++----- src/replication.cc | 25 +++++++++------ src/sio.cc | 13 +++----- 12 files changed, 180 insertions(+), 89 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c18672f6a9..b41beac198 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,10 +80,10 @@ check_function_exists(memmem HAVE_MEMMEM) check_function_exists(memrchr HAVE_MEMRCHR) check_function_exists(sendfile HAVE_SENDFILE) if (HAVE_SENDFILE) -message (STATUS "sendfile found, let's find what version was found") -check_prototype_definition(sendfile "ssize_t sendfile(int out_fd, int in_fd, off_t * offset, size_t count)" "0" "sys/sendfile.h" HAVE_SENDFILE_LINUX) -check_prototype_definition(sendfile "int sendfile(int fd, int s, off_t offset, size_t nbytes, struct sf_hdtr *hdtr, off_t *sbytes, int flags)" "0" "" HAVE_SENDFILE_BSD) -endif () + message (STATUS "sendfile found, let's find what version was found") + check_prototype_definition(sendfile "ssize_t sendfile(int out_fd, int in_fd, off_t * offset, size_t count)" "0" "sys/sendfile.h" HAVE_SENDFILE_LINUX) + check_prototype_definition(sendfile "int sendfile(int fd, int s, off_t offset, size_t nbytes, struct sf_hdtr *hdtr, off_t *sbytes, int flags)" "0" "" HAVE_SENDFILE_BSD) +endif() # # Some versions of GNU libc define non-portable __libc_stack_end diff --git a/cfg/tarantool_box_cfg.c b/cfg/tarantool_box_cfg.c index 8075630bf8..704dc22ab5 100644 --- a/cfg/tarantool_box_cfg.c +++ b/cfg/tarantool_box_cfg.c @@ -60,6 +60,7 @@ init_tarantool_cfg(tarantool_cfg *c) { c->too_long_threshold = 0; c->custom_proc_title = NULL; c->replication_source = NULL; + c->replica_1_5_mode = false; } int @@ -104,6 +105,7 @@ fill_default_tarantool_cfg(tarantool_cfg *c) { c->too_long_threshold = 0.5; c->custom_proc_title = NULL; c->replication_source = NULL; + c->replica_1_5_mode = false; return 0; } @@ -210,6 +212,9 @@ static NameAtom _name__custom_proc_title[] = { static NameAtom _name__replication_source[] = { { "replication_source", -1, NULL } }; +static NameAtom _name__replica_1_5_mode[] = { + { "replica_1_5_mode", -1, NULL } +}; #define ARRAYALLOC(x,n,t,_chk_ro, __flags) do { \ int l = 0, ar; \ @@ -693,6 +698,31 @@ acceptValue(tarantool_cfg* c, OptDef* opt, int check_rdonly) { if (opt->paramValue.scalarval && c->replication_source == NULL) return CNF_NOMEMORY; } + else if ( cmpNameAtoms( opt->name, _name__replica_1_5_mode) ) { + if (opt->paramType != scalarType ) + return CNF_WRONGTYPE; + c->__confetti_flags &= ~CNF_FLAG_STRUCT_NOTSET; + errno = 0; + bool res; + + if (strcasecmp(opt->paramValue.scalarval, "true") == 0 || + strcasecmp(opt->paramValue.scalarval, "yes") == 0 || + strcasecmp(opt->paramValue.scalarval, "enable") == 0 || + strcasecmp(opt->paramValue.scalarval, "on") == 0 || + strcasecmp(opt->paramValue.scalarval, "1") == 0 ) + res = true; + else if (strcasecmp(opt->paramValue.scalarval, "false") == 0 || + strcasecmp(opt->paramValue.scalarval, "no") == 0 || + strcasecmp(opt->paramValue.scalarval, "disable") == 0 || + strcasecmp(opt->paramValue.scalarval, "off") == 0 || + strcasecmp(opt->paramValue.scalarval, "0") == 0 ) + res = false; + else + return CNF_WRONGRANGE; + if (check_rdonly && c->replica_1_5_mode != res) + return CNF_RDONLY; + c->replica_1_5_mode = res; + } else { return opt->optional ? CNF_OPTIONAL : CNF_MISSED; } diff --git a/cfg/tarantool_box_cfg.h b/cfg/tarantool_box_cfg.h index 1073c3937c..016b84d68c 100644 --- a/cfg/tarantool_box_cfg.h +++ b/cfg/tarantool_box_cfg.h @@ -159,6 +159,12 @@ typedef struct tarantool_cfg { * only accepts reads. */ char* replication_source; + + /* + * 1.5 (Old) replication mode + * If enabled, the replica will successfully connect to 1.5 master + */ + bool replica_1_5_mode; } tarantool_cfg; #ifndef CNF_FLAG_STRUCT_NEW diff --git a/include/recovery.h b/include/recovery.h index 312759cda3..e34f977d7c 100644 --- a/include/recovery.h +++ b/include/recovery.h @@ -137,6 +137,9 @@ int read_log(const char *filename, void recovery_follow_remote(struct recovery_state *r, const char *addr); void recovery_stop_remote(struct recovery_state *r); +void recovery_follow_remote_1_5(struct recovery_state *r, const char *addr); +void recovery_stop_remote_1_5(struct recovery_state *r); + enum rpl_request_type { RPL_GET_WAL = 0, RPL_GET_SNAPSHOT @@ -153,6 +156,12 @@ void snapshot_save(struct recovery_state *r, void init_storage(struct log_dir *dir); +/** + * Get version (defined in PACKAGE_VERSION), packed into uint32_t + * The highest byte or result means major version, next - minor etc. + */ +uint32_t get_package_version_packed(); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/include/sio.h b/include/sio.h index f3aa600d2b..8d189490af 100644 --- a/include/sio.h +++ b/include/sio.h @@ -39,6 +39,7 @@ #include <netdb.h> #include <fcntl.h> #include "exception.h" +#include <tarantool_ev.h> enum { SERVICE_NAME_MAXLEN = 32 }; @@ -54,7 +55,6 @@ class FDHolder { ~FDHolder(); int Release(); void Reset(int _fd = -1); - operator int(); private: int fd; @@ -92,55 +92,55 @@ ssize_t sio_writev(int fd, const struct iovec *iov, int iovcnt); ssize_t sio_write_total(int fd, const void *buf, size_t count, size_t total); /** - * reads at least count bytes, buf_size maximum from fd. - * throws on error or disconnect. returns count of bytes actually read. - * returns if no activity occurred during timeout seconds - * timeout is the only reason to return value less than count parameter. + * Read at least count bytes, buf_size maximum from fd. + * Throw on error or disconnect. Return count of bytes actually read. + * Return if no activity occurred during timeout seconds. + * timeout is the only reason to return value less than count parameter. * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" */ ssize_t sio_read_ahead_timeout(int fd, void *buf, size_t count, size_t buf_size, - float timeout); + ev_tstamp timeout); /** - * reads from fd, buf_size maximum bytes. - * throws on error or disconnect. returns count of bytes actually read. - * returns if no activity occurred during timeout seconds + * Read from fd, buf_size maximum bytes. + * Throw on error or disconnect. Return count of bytes actually read. + * Return if no activity occurred during timeout seconds. * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" */ ssize_t -sio_read_timeout(int fd, void *buf, size_t buf_size, float timeout_ms); +sio_read_timeout(int fd, void *buf, size_t buf_size, ev_tstamp timeout_ms); /** - * reads count bytes from fd. - * throws on error or disconnect. returns count of bytes actually read. - * returns if no activity occurred during timeout seconds + * Read count bytes from fd. + * Throw on error or disconnect. Return count of bytes actually read. + * Return if no activity occurred during timeout seconds. * timeout is the only reason to return value, different from count parameter. * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" */ ssize_t -sio_readn_timeout(int fd, void *buf, size_t count, float timeout); +sio_readn_timeout(int fd, void *buf, size_t count, ev_tstamp timeout); /** - * writes count bytes to fd. - * throws on error or disconnect. returns count of bytes actually written. - * returns if no activity occurred during timeout seconds + * Write count bytes to fd. + * Throw on error or disconnect. Return count of bytes actually written. + * Return if no activity occurred during timeout seconds. * timeout is the only reason to return value, different from count parameter. * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" */ ssize_t -sio_writen_timeout(int fd, const void *buf, size_t count, float timeout); +sio_writen_timeout(int fd, const void *buf, size_t count, ev_tstamp timeout); /** - * wrap over sendfile. - * throws if send file failed + * A wrapper over sendfile. + * Throw if send file failed. */ ssize_t sio_sendfile(int sock_fd, int file_fd, off_t *offset, size_t size); /** - * receive a file sent by sendfile - * throws if receiving failed + * Receive a file sent by sendfile + * Throw if receiving failed */ ssize_t sio_recvfile(int sock_fd, int file_fd, off_t *offset, size_t size); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 102f8fd9c6..947349e5a3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -61,6 +61,7 @@ set (common_sources admin.cc cpu_feature.c replica.cc + replica.1.5.cc iproto.cc iproto_port.cc session.cc diff --git a/src/box/box.cc b/src/box/box.cc index ab20ae8726..e98dd0ab62 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -147,7 +147,11 @@ box_enter_master_or_replica_mode(struct tarantool_cfg *conf) box_process = process_replica; recovery_wait_lsn(recovery_state, recovery_state->lsn); - recovery_follow_remote(recovery_state, conf->replication_source); + if (conf->replica_1_5_mode) { + recovery_follow_remote_1_5(recovery_state, conf->replication_source); + } else { + recovery_follow_remote(recovery_state, conf->replication_source); + } snprintf(status, sizeof(status), "replica/%s%s", conf->replication_source, custom_proc_title); @@ -236,8 +240,13 @@ box_reload_config(struct tarantool_cfg *old_conf, struct tarantool_cfg *new_conf return -1; } - if (recovery_state->remote) - recovery_stop_remote(recovery_state); + if (recovery_state->remote) { + if (old_conf->replica_1_5_mode) { + recovery_stop_remote_1_5(recovery_state); + } else { + recovery_stop_remote(recovery_state); + } + } box_enter_master_or_replica_mode(new_conf); } diff --git a/src/recovery.cc b/src/recovery.cc index 91b29312c1..9a35aa6f30 100644 --- a/src/recovery.cc +++ b/src/recovery.cc @@ -329,43 +329,41 @@ init_storage_from_master(struct log_dir *dir) } addr.sin_port = htons(port); - FDHolder sock_fd_holder(sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)); - sio_connect(sock_fd_holder, &addr, sizeof(addr)); - - uint32_t replica_version = default_version, master_version = 0; - sio_writen_timeout(sock_fd_holder, &replica_version, + int sock_fd = sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + FDHolder sock_fd_holder(sock_fd); + sio_connect(sock_fd, &addr, sizeof(addr)); + + uint32_t replica_version[3] = { default_version, + get_package_version_packed(), 0 }; + uint32_t master_version[3] = { 0 }; + sio_writen_timeout(sock_fd, replica_version, sizeof(replica_version), 10); - sio_readn_timeout(sock_fd_holder, &master_version, + sio_readn_timeout(sock_fd, master_version, sizeof(master_version), 10); - if (master_version == 0) { - say_error("handshake %d", master_version); - return; - } else if (master_version < 12) { - say_error("master is too old %d", master_version); - return; - } else if (master_version > 256*256) { - say_error("invalid master version %d", master_version); + if (master_version[0] != default_version) { + say_error("invalid master version %d", master_version[0]); return; } uint32_t request = RPL_GET_SNAPSHOT; - sio_writen_timeout(sock_fd_holder, &request, sizeof(request), 10); + sio_writen_timeout(sock_fd, &request, sizeof(request), 10); uint64_t recv_buf[2]; - sio_readn_timeout(sock_fd_holder, recv_buf, sizeof(recv_buf), 10); + sio_readn_timeout(sock_fd, recv_buf, sizeof(recv_buf), 10); uint64_t lsn = recv_buf[0]; uint64_t file_size = recv_buf[1]; const char* filename = format_filename(dir, lsn, NONE); - FDHolder file_fd_holder(open(filename, - O_WRONLY | O_CREAT | O_EXCL | dir->open_wflags, dir->mode)); + int file_fd = open(filename, + O_WRONLY | O_CREAT | O_EXCL | dir->open_wflags, dir->mode); + FDHolder file_fd_holder(file_fd); - if (file_fd_holder < 0) { + if (file_fd < 0) { say_error("failed to create initial snapshot file"); return; } - sio_recvfile(sock_fd_holder, file_fd_holder, NULL, file_size); + sio_recvfile(sock_fd, file_fd, NULL, file_size); say_info("done"); } @@ -1388,5 +1386,35 @@ read_log(const char *filename, return 0; } +uint32_t get_package_version_packed() +{ + static uint32_t result = 0xFEFEFEFE; + if (result == 0xFEFEFEFE) { + uint32_t parts[4] = { 0 }; + size_t reading_part = 0; + for (const char *p = PACKAGE_VERSION; + *p && reading_part < sizeof(parts) / sizeof(parts[0]); ++p) { + if (*p >= '0' && *p <= '9') { + parts[reading_part] = parts[reading_part] * 10 + (*p - '0'); + } else { + reading_part++; + } + } + result = 0; + for (size_t i = 0; i < sizeof(parts) / sizeof(parts[0]); i++) { + if (parts[i] > 0xFF) { + say_warn("Package version has part, greater than 255: " + "truncating: %u.%u.%u.%u", + parts[0], parts[1], parts[2], parts[3]); + parts[i] = 0xFF; + } + result = (result << 8) | parts[i]; + } + say_info("Package version: %u.%u.%u.%u", + parts[0], parts[1], parts[2], parts[3]); + } + return result; +} + /* }}} */ diff --git a/src/replica.1.5.cc b/src/replica.1.5.cc index 2d2983b9bd..fa2b792b38 100644 --- a/src/replica.1.5.cc +++ b/src/replica.1.5.cc @@ -37,11 +37,13 @@ #include "pickle.h" #include "coio_buf.h" +static const uint32_t version_1_5 = 11; + static void -remote_apply_row(struct recovery_state *r, const char *row, uint32_t rowlne); +remote_apply_row_1_5(struct recovery_state *r, const char *row, uint32_t rowlne); -const char * -remote_read_row(struct ev_io *coio, struct iobuf *iobuf, uint32_t *rowlen) +static const char * +remote_read_row_1_5(struct ev_io *coio, struct iobuf *iobuf, uint32_t *rowlen) { struct ibuf *in = &iobuf->in; ssize_t to_read = sizeof(struct row_header) - ibuf_size(in); @@ -65,7 +67,7 @@ remote_read_row(struct ev_io *coio, struct iobuf *iobuf, uint32_t *rowlen) } static void -remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, +remote_connect_1_5(struct ev_io *coio, struct sockaddr_in *remote_addr, int64_t initial_lsn, const char **err) { evio_socket(coio, AF_INET, SOCK_STREAM, IPPROTO_TCP); @@ -80,7 +82,7 @@ remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, *err = "can't read version"; coio_readn(coio, &version, sizeof(version)); *err = NULL; - if (version != default_version) + if (version != version_1_5) tnt_raise(IllegalParams, "remote version mismatch"); say_crit("successfully connected to master"); @@ -88,7 +90,7 @@ remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, } static void -pull_from_remote(va_list ap) +pull_from_remote_1_5(va_list ap) { struct recovery_state *r = va_arg(ap, struct recovery_state *); struct ev_io coio; @@ -105,20 +107,20 @@ pull_from_remote(va_list ap) if (! evio_is_active(&coio)) { if (iobuf == NULL) iobuf = iobuf_new(fiber_name(fiber)); - remote_connect(&coio, &r->remote->addr, + remote_connect_1_5(&coio, &r->remote->addr, r->confirmed_lsn + 1, &err); warning_said = false; } err = "can't read row"; uint32_t rowlen; - const char *row = remote_read_row(&coio, iobuf, &rowlen); + const char *row = remote_read_row_1_5(&coio, iobuf, &rowlen); fiber_setcancellable(false); err = NULL; r->remote->recovery_lag = ev_now() - row_header(row)->tm; r->remote->recovery_last_update_tstamp = ev_now(); - remote_apply_row(r, row, rowlen); + remote_apply_row_1_5(r, row, rowlen); iobuf_gc(iobuf); fiber_gc(); @@ -141,7 +143,7 @@ pull_from_remote(va_list ap) } static void -remote_apply_row(struct recovery_state *r, const char *row, uint32_t rowlen) +remote_apply_row_1_5(struct recovery_state *r, const char *row, uint32_t rowlen) { int64_t lsn = row_header(row)->lsn; @@ -154,7 +156,7 @@ remote_apply_row(struct recovery_state *r, const char *row, uint32_t rowlen) } void -recovery_follow_remote(struct recovery_state *r, const char *addr) +recovery_follow_remote_1_5(struct recovery_state *r, const char *addr) { char name[FIBER_NAME_MAXLEN]; char ip_addr[32]; @@ -169,7 +171,7 @@ recovery_follow_remote(struct recovery_state *r, const char *addr) snprintf(name, sizeof(name), "replica/%s", addr); try { - f = fiber_new(name, pull_from_remote); + f = fiber_new(name, pull_from_remote_1_5); } catch (const Exception& ) { return; } @@ -195,7 +197,7 @@ recovery_follow_remote(struct recovery_state *r, const char *addr) } void -recovery_stop_remote(struct recovery_state *r) +recovery_stop_remote_1_5(struct recovery_state *r) { say_info("shutting down the replica"); fiber_cancel(r->remote->reader); diff --git a/src/replica.cc b/src/replica.cc index 2ab415a906..cf49c43851 100644 --- a/src/replica.cc +++ b/src/replica.cc @@ -38,10 +38,12 @@ #include "coio_buf.h" #include "recovery.h" +static const uint32_t supported_featutes = 0; + static void remote_apply_row(struct recovery_state *r, const char *row, uint32_t rowlne); -const char * +static const char * remote_read_row(struct ev_io *coio, struct iobuf *iobuf, uint32_t *rowlen) { struct ibuf *in = &iobuf->in; @@ -74,25 +76,29 @@ remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, *err = "can't connect to master"; coio_connect(coio, remote_addr); - uint32_t replica_version = default_version, master_version = 0; - ssize_t write_res = coio_write_timeout(coio, &replica_version, + uint32_t replica_version[3] = { default_version, + get_package_version_packed(), supported_featutes }; + uint32_t master_version[3] = { 0 }; + ssize_t write_res = coio_write_timeout(coio, replica_version, sizeof(replica_version), 1.); if(write_res != sizeof(replica_version)) { tnt_raise(IllegalParams, "handshake failed"); } - ssize_t read_res = coio_readn_ahead_timeout(coio, &master_version, + ssize_t read_res = coio_readn_ahead_timeout(coio, master_version, sizeof(master_version), sizeof(master_version), 1.); if(read_res != sizeof(master_version)) { tnt_raise(IllegalParams, "handshake failed"); } - if (master_version < 12 || master_version >= 256*256) { + if (master_version[0] != 12) { tnt_raise(IllegalParams, "invalid remote version"); } - uint32_t request = RPL_GET_WAL; - coio_write(coio, &request, sizeof(request)); - coio_write(coio, &initial_lsn, sizeof(initial_lsn)); + struct send_request { + uint32_t request_type; + int64_t initial_lsn; + } __attribute__((packed)) send_request = { RPL_GET_WAL, initial_lsn }; + coio_write(coio, &send_request, sizeof(send_request)); say_crit("successfully connected to master"); say_crit("starting replication from lsn: %" PRIi64, initial_lsn); diff --git a/src/replication.cc b/src/replication.cc index 0f10d1fcb3..38cc650bf2 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -81,6 +81,8 @@ extern "C" { */ static int master_to_spawner_socket; +static const uint32_t supported_featutes = 0; + /** Accept a new connection on the replication port: push the accepted socket * to the spawner. */ @@ -624,19 +626,20 @@ replication_relay_send_snapshot_by_file(int client_sock) local_snap_dir.dirname = cfg.snap_dir; int64_t lsn = greatest_lsn(&local_snap_dir); const char* filename = format_filename(&local_snap_dir, lsn, NONE); - FDHolder file_fd_holder(open(filename, - O_RDONLY | local_snap_dir.open_wflags, local_snap_dir.mode)); - if (file_fd_holder < 0) { + int file_fd = open(filename, + O_RDONLY | local_snap_dir.open_wflags, local_snap_dir.mode); + FDHolder file_fd_holder(file_fd); + if (file_fd < 0) { say_error("can't find/open snapshot"); exit(EXIT_FAILURE); } - uint64_t file_size = get_file_size(file_fd_holder); + uint64_t file_size = get_file_size(file_fd); uint64_t send_buf[2]; send_buf[0] = lsn; send_buf[1] = file_size; sio_writen_timeout(client_sock, send_buf, sizeof(send_buf), -1); - sio_sendfile(client_sock, file_fd_holder, 0, file_size); + sio_sendfile(client_sock, file_fd, 0, file_size); exit(EXIT_SUCCESS); } @@ -688,22 +691,24 @@ replication_relay_loop(int client_sock) say_syserror("sigaction"); } - uint32_t master_version = default_version, replica_version = 0; + uint32_t master_version[3] = { default_version, + get_package_version_packed(), supported_featutes }; + uint32_t replica_version[3] = { 0 }; ssize_t read_res = sio_readn_timeout(client_sock, - &replica_version, sizeof(replica_version), 10); + replica_version, sizeof(replica_version), 10); if (read_res != sizeof(replica_version)) { say_error("handshake failed"); exit(EXIT_FAILURE); } ssize_t write_res = sio_writen_timeout(client_sock, - &master_version, sizeof(master_version), 10); + master_version, sizeof(master_version), 10); if (write_res != sizeof(master_version)) { say_error("handshake failed"); exit(EXIT_FAILURE); } - if (replica_version < 12) { - say_error("invalid replica version! %d", replica_version); + if (replica_version[0] < 12) { + say_error("invalid replica version! %d", replica_version[0]); exit(EXIT_FAILURE); } diff --git a/src/sio.cc b/src/sio.cc index 8a9266b0d4..b8808d634a 100644 --- a/src/sio.cc +++ b/src/sio.cc @@ -84,11 +84,6 @@ void FDHolder::Reset(int _fd) fd = _fd; } -FDHolder::operator int() -{ - return fd; -} - /** Pretty print socket name and peer (for exceptions) */ const char * sio_socketname(int fd) @@ -316,7 +311,7 @@ sio_writev(int fd, const struct iovec *iov, int iovcnt) ssize_t sio_read_ahead_timeout(int fd, void *buf, size_t count, size_t buf_size, - float timeout) + ev_tstamp timeout) { pollfd pfd; pfd.events = POLLIN; @@ -346,19 +341,19 @@ sio_read_ahead_timeout(int fd, void *buf, size_t count, size_t buf_size, } ssize_t -sio_read_timeout(int fd, void *buf, size_t buf_size, float timeout) +sio_read_timeout(int fd, void *buf, size_t buf_size, ev_tstamp timeout) { return sio_read_ahead_timeout(fd, buf, 1, buf_size, timeout); } ssize_t -sio_readn_timeout(int fd, void *buf, size_t count, float timeout) +sio_readn_timeout(int fd, void *buf, size_t count, ev_tstamp timeout) { return sio_read_ahead_timeout(fd, buf, count, count, timeout); } ssize_t -sio_writen_timeout(int fd, const void *buf, size_t count, float timeout) +sio_writen_timeout(int fd, const void *buf, size_t count, ev_tstamp timeout) { pollfd pfd; pfd.events = POLLOUT; -- GitLab