diff --git a/CMakeLists.txt b/CMakeLists.txt index c3cbc939757a9e5b149128021750cbb3283ca9ba..c18672f6a9927018f9a9eac4849829aa09111594 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ include(CheckCSourceCompiles) include(CheckCXXSourceCompiles) include(TestBigEndian) include(CheckFunctionExists) +include(CheckPrototypeDefinition) include(FindOptionalPackage) find_program(ECHO echo) @@ -77,6 +78,12 @@ check_symbol_exists(O_DSYNC fcntl.h HAVE_O_DSYNC) check_function_exists(fdatasync HAVE_FDATASYNC) check_function_exists(memmem HAVE_MEMMEM) check_function_exists(memrchr HAVE_MEMRCHR) +check_function_exists(sendfile HAVE_SENDFILE) +if (HAVE_SENDFILE) +message (STATUS "sendfile found, let's find what version was found") +check_prototype_definition(sendfile "ssize_t sendfile(int out_fd, int in_fd, off_t * offset, size_t count)" "0" "sys/sendfile.h" HAVE_SENDFILE_LINUX) +check_prototype_definition(sendfile "int sendfile(int fd, int s, off_t offset, size_t nbytes, struct sf_hdtr *hdtr, off_t *sbytes, int flags)" "0" "" HAVE_SENDFILE_BSD) +endif () # # Some versions of GNU libc define non-portable __libc_stack_end diff --git a/include/recovery.h b/include/recovery.h index 9be49b7a4e9cbca9b6ad9edc0f9099fde4a680d4..312759cda30c52e73c2566de21420850783ff04b 100644 --- a/include/recovery.h +++ b/include/recovery.h @@ -137,9 +137,9 @@ int read_log(const char *filename, void recovery_follow_remote(struct recovery_state *r, const char *addr); void recovery_stop_remote(struct recovery_state *r); -enum replica_to_master_request { - NORMAL_REPLICA = 0, - SNAPSHOT_REQUEST_BY_FILE +enum rpl_request_type { + RPL_GET_WAL = 0, + RPL_GET_SNAPSHOT }; struct fio_batch; diff --git a/include/sio.h b/include/sio.h index 5b10b506ccd50f586127dc26be1ad172af74aacf..f3aa600d2bcf7ba10325acaef1bfb6f3a4645df8 100644 --- a/include/sio.h +++ b/include/sio.h @@ -48,6 +48,20 @@ class SocketError: public SystemError { const char *format, ...); }; +class FDHolder { +public: + explicit FDHolder(int _fd = -1); + ~FDHolder(); + int Release(); + void Reset(int _fd = -1); + operator int(); + +private: + int fd; + FDHolder(const FDHolder&); + void operator=(const FDHolder&); +}; + const char *sio_socketname(int fd); int sio_socket(int domain, int type, int protocol); @@ -77,38 +91,59 @@ ssize_t sio_writev(int fd, const struct iovec *iov, int iovcnt); ssize_t sio_write_total(int fd, const void *buf, size_t count, size_t total); -/* - * timeout procedures: throw or return (depends on throw_on_timeout) if -* no activity occurred during timeout_ms milliseconds - * timeout_ms == 0 means "don't wait" - * timeout_ms < 0 means "infinite wait" +/** + * reads at least count bytes, buf_size maximum from fd. + * throws on error or disconnect. returns count of bytes actually read. + * returns if no activity occurred during timeout seconds + * timeout is the only reason to return value less than count parameter. + * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" */ ssize_t sio_read_ahead_timeout(int fd, void *buf, size_t count, size_t buf_size, - int timeout_ms, bool throw_on_timeout); + float timeout); + +/** + * reads from fd, buf_size maximum bytes. + * throws on error or disconnect. returns count of bytes actually read. + * returns if no activity occurred during timeout seconds + * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" + */ ssize_t -sio_read_timeout(int fd, void *buf, size_t buf_size, int timeout_ms, - bool throw_on_timeout); +sio_read_timeout(int fd, void *buf, size_t buf_size, float timeout_ms); + +/** + * reads count bytes from fd. + * throws on error or disconnect. returns count of bytes actually read. + * returns if no activity occurred during timeout seconds + * timeout is the only reason to return value, different from count parameter. + * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" + */ ssize_t -sio_readn_timeout(int fd, void *buf, size_t count, int timeout_ms, - bool throw_on_timeout); +sio_readn_timeout(int fd, void *buf, size_t count, float timeout); + +/** + * writes count bytes to fd. + * throws on error or disconnect. returns count of bytes actually written. + * returns if no activity occurred during timeout seconds + * timeout is the only reason to return value, different from count parameter. + * timeout == 0 means "don't wait", timeout < 0 means "infinite wait" + */ ssize_t -sio_write_timeout(int fd, const void *buf, size_t count, int timeout_ms, - bool throw_on_timeout); +sio_writen_timeout(int fd, const void *buf, size_t count, float timeout); -/* - * send a file size and a file to the socket. - * meaning of offset and size parameters are same as is linux sendfile, - * with added feature: if size is negative, the file is sent till eof +/** + * wrap over sendfile. + * throws if send file failed */ ssize_t -sio_sendfile(int sock_fd, int file_fd, off_t *offset, ssize_t size); -/* - * receive a file sent by sio_sendfile - * simply writes data to the file_fd, thus updates file offset -*/ +sio_sendfile(int sock_fd, int file_fd, off_t *offset, size_t size); + +/** + * receive a file sent by sendfile + * throws if receiving failed + */ ssize_t -sio_recvfile(int sock_fd, int file_fd); +sio_recvfile(int sock_fd, int file_fd, off_t *offset, size_t size); ssize_t sio_sendto(int fd, const void *buf, size_t len, int flags, diff --git a/include/tarantool/config.h.cmake b/include/tarantool/config.h.cmake index fd180127adc4fc30454f6e66700e6b2006f4e7fa..bd43b709e20cdd0fb59d6b14395d6ae77e7b2803 100644 --- a/include/tarantool/config.h.cmake +++ b/include/tarantool/config.h.cmake @@ -65,6 +65,18 @@ * Defined if this platform has GNU specific memrchr(). */ #cmakedefine HAVE_MEMRCHR 1 +/* + * Defined if this platform has sendfile(..). + */ +#cmakedefine HAVE_SENDFILE 1 +/* + * Defined if this platform has Linux specific sendfile(..). + */ +#cmakedefine HAVE_SENDFILE_LINUX 1 +/* + * Defined if this platform has BSD specific sendfile(..). + */ +#cmakedefine HAVE_SENDFILE_BSD 1 /* * Set if this is a GNU system and libc has __libc_stack_end. */ diff --git a/src/recovery.cc b/src/recovery.cc index cd149606536eeaefb942b5161fc85ad108f9a1a2..91b29312c1be3a34efb5f5e36efc231ef20807f8 100644 --- a/src/recovery.cc +++ b/src/recovery.cc @@ -329,15 +329,14 @@ init_storage_from_master(struct log_dir *dir) } addr.sin_port = htons(port); - int sock_fd = sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - auto sock_fd_holder = make_scoped_guard([=] { close(sock_fd); }); - sio_connect(sock_fd, &addr, sizeof(addr)); + FDHolder sock_fd_holder(sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)); + sio_connect(sock_fd_holder, &addr, sizeof(addr)); uint32_t replica_version = default_version, master_version = 0; - sio_write_timeout(sock_fd, &replica_version, sizeof(replica_version), - 10000, true); - sio_readn_timeout(sock_fd, &master_version, sizeof(master_version), - 10000, true); + sio_writen_timeout(sock_fd_holder, &replica_version, + sizeof(replica_version), 10); + sio_readn_timeout(sock_fd_holder, &master_version, + sizeof(master_version), 10); if (master_version == 0) { say_error("handshake %d", master_version); @@ -350,22 +349,23 @@ init_storage_from_master(struct log_dir *dir) return; } - uint32_t request = SNAPSHOT_REQUEST_BY_FILE; - sio_write_timeout(sock_fd, &request, sizeof(request), 10000, true); + uint32_t request = RPL_GET_SNAPSHOT; + sio_writen_timeout(sock_fd_holder, &request, sizeof(request), 10); - uint64_t lsn; - sio_readn_timeout(sock_fd, &lsn, sizeof(lsn), 10000, true); + uint64_t recv_buf[2]; + sio_readn_timeout(sock_fd_holder, recv_buf, sizeof(recv_buf), 10); + uint64_t lsn = recv_buf[0]; + uint64_t file_size = recv_buf[1]; const char* filename = format_filename(dir, lsn, NONE); - int file_fd = open(filename, - O_WRONLY | O_CREAT | O_EXCL | dir->open_wflags, dir->mode); + FDHolder file_fd_holder(open(filename, + O_WRONLY | O_CREAT | O_EXCL | dir->open_wflags, dir->mode)); - if (file_fd < 0) { + if (file_fd_holder < 0) { say_error("failed to create initial snapshot file"); return; } - auto file_fd_holder = make_scoped_guard([=] { close(file_fd); }); - sio_recvfile(sock_fd, file_fd); + sio_recvfile(sock_fd_holder, file_fd_holder, NULL, file_size); say_info("done"); } diff --git a/src/replica.cc b/src/replica.cc index cb8c3c3837029aa15bc30cca7c8fef0f4ae4422e..2ab415a906dc9ce31ac6c0b60f7b04d032d3ee86 100644 --- a/src/replica.cc +++ b/src/replica.cc @@ -75,27 +75,25 @@ remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, coio_connect(coio, remote_addr); uint32_t replica_version = default_version, master_version = 0; - coio_write(coio, &replica_version, sizeof(replica_version)); - coio_readn_ahead_timeout(coio, &master_version, + ssize_t write_res = coio_write_timeout(coio, &replica_version, + sizeof(replica_version), 1.); + if(write_res != sizeof(replica_version)) { + tnt_raise(IllegalParams, "handshake failed"); + } + ssize_t read_res = coio_readn_ahead_timeout(coio, &master_version, sizeof(master_version), sizeof(master_version), 1.); + if(read_res != sizeof(master_version)) { + tnt_raise(IllegalParams, "handshake failed"); + } - if (master_version < 12) { - evio_close(coio); - evio_socket(coio, AF_INET, SOCK_STREAM, IPPROTO_TCP); - *err = "can't connect to master"; - coio_connect(coio, remote_addr); - - coio_write(coio, &initial_lsn, sizeof(initial_lsn)); - coio_readn(coio, &master_version, sizeof(master_version)); - } else { - if (master_version >= 256*256) - tnt_raise(IllegalParams, "invalid remote version"); - - uint32_t request = NORMAL_REPLICA; - coio_write(coio, &request, sizeof(request)); - coio_write(coio, &initial_lsn, sizeof(initial_lsn)); + if (master_version < 12 || master_version >= 256*256) { + tnt_raise(IllegalParams, "invalid remote version"); } + uint32_t request = RPL_GET_WAL; + coio_write(coio, &request, sizeof(request)); + coio_write(coio, &initial_lsn, sizeof(initial_lsn)); + say_crit("successfully connected to master"); say_crit("starting replication from lsn: %" PRIi64, initial_lsn); } diff --git a/src/replication.cc b/src/replication.cc index 3b4ca1a66153ff50d77f8ff0ef757589b1828fad..0f10d1fcb353bccfd9c447e1471917eb6fba8135 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -605,25 +605,38 @@ replication_relay_send_row(void *param, const char *row, uint32_t rowlen) exit(EXIT_SUCCESS); } +static size_t +get_file_size(int file_fd) +{ + struct stat st; + if (fstat(file_fd, &st) != 0) { + return 0; + } + return st.st_size; +} + static void replication_relay_send_snapshot_by_file(int client_sock) { - auto sock_fd_holder = make_scoped_guard([=] { close(client_sock); }); + FDHolder sock_fd_holder(client_sock); struct log_dir local_snap_dir; memcpy(&local_snap_dir, &snap_dir, sizeof(local_snap_dir)); local_snap_dir.dirname = cfg.snap_dir; int64_t lsn = greatest_lsn(&local_snap_dir); const char* filename = format_filename(&local_snap_dir, lsn, NONE); - int file_fd = open(filename, - O_RDONLY | local_snap_dir.open_wflags, local_snap_dir.mode); - if (file_fd < 0) { + FDHolder file_fd_holder(open(filename, + O_RDONLY | local_snap_dir.open_wflags, local_snap_dir.mode)); + if (file_fd_holder < 0) { say_error("can't find/open snapshot"); exit(EXIT_FAILURE); } - auto file_fd_holder = make_scoped_guard([=] { close(file_fd); }); - sio_write_timeout(client_sock, &lsn, sizeof(lsn), -1, true); - sio_sendfile(client_sock, file_fd, 0, -1); + uint64_t file_size = get_file_size(file_fd_holder); + uint64_t send_buf[2]; + send_buf[0] = lsn; + send_buf[1] = file_size; + sio_writen_timeout(client_sock, send_buf, sizeof(send_buf), -1); + sio_sendfile(client_sock, file_fd_holder, 0, file_size); exit(EXIT_SUCCESS); } @@ -676,32 +689,34 @@ replication_relay_loop(int client_sock) } uint32_t master_version = default_version, replica_version = 0; - sio_readn_timeout(client_sock, - &replica_version, sizeof(replica_version), -1, true); - sio_write_timeout(client_sock, - &master_version, sizeof(master_version), -1, true); + ssize_t read_res = sio_readn_timeout(client_sock, + &replica_version, sizeof(replica_version), 10); + if (read_res != sizeof(replica_version)) { + say_error("handshake failed"); + exit(EXIT_FAILURE); + } + ssize_t write_res = sio_writen_timeout(client_sock, + &master_version, sizeof(master_version), 10); + if (write_res != sizeof(master_version)) { + say_error("handshake failed"); + exit(EXIT_FAILURE); + } if (replica_version < 12) { - union lsn_union { - int64_t lsn; - uint32_t dummy[2]; - } lsn_hint; - lsn_hint.dummy[0] = replica_version; - sio_readn_timeout(client_sock, &lsn_hint.dummy[1], - sizeof(lsn) - sizeof(replica_version), -1, true); - lsn = lsn_hint.lsn; - } else { - uint32_t request; - sio_readn_timeout(client_sock, &request, sizeof(request), -1, true); - if (request == SNAPSHOT_REQUEST_BY_FILE) { - replication_relay_send_snapshot_by_file(client_sock); /*exits*/ - } - if (request != NORMAL_REPLICA) { - say_error("unknown replica request: %d", request); - exit(EXIT_FAILURE); - } - sio_readn_timeout(client_sock, &lsn, sizeof(lsn), -1, true); + say_error("invalid replica version! %d", replica_version); + exit(EXIT_FAILURE); + } + + uint32_t request; + sio_readn_timeout(client_sock, &request, sizeof(request), -1); + if (request == RPL_GET_SNAPSHOT) { + replication_relay_send_snapshot_by_file(client_sock); /*exits*/ + } + if (request != RPL_GET_WAL) { + say_error("unknown replica request: %d", request); + exit(EXIT_FAILURE); } + sio_readn_timeout(client_sock, &lsn, sizeof(lsn), -1); /* init libev events handlers */ ev_default_loop(0); diff --git a/src/sio.cc b/src/sio.cc index 545f0eb012c7aeabd2ec837de53157f836b28c9b..8a9266b0d4fc67d9e63f5b24e26c33a375291176 100644 --- a/src/sio.cc +++ b/src/sio.cc @@ -60,6 +60,35 @@ SocketError::SocketError(const char *file, unsigned line, int fd, errno = save_errno; } +FDHolder::FDHolder(int _fd) : fd(_fd) +{ +} + +FDHolder::~FDHolder() +{ + Reset(); +} + +int FDHolder::Release() +{ + int result = fd; + fd = -1; + return result; +} + +void FDHolder::Reset(int _fd) +{ + if(fd >= 0) { + close(fd); + } + fd = _fd; +} + +FDHolder::operator int() +{ + return fd; +} + /** Pretty print socket name and peer (for exceptions) */ const char * sio_socketname(int fd) @@ -287,26 +316,27 @@ sio_writev(int fd, const struct iovec *iov, int iovcnt) ssize_t sio_read_ahead_timeout(int fd, void *buf, size_t count, size_t buf_size, - int timeout_ms, bool throw_on_timeout) + float timeout) { pollfd pfd; pfd.events = POLLIN; pfd.fd = fd; size_t read_count = 0; do { - int poll_res = poll(&pfd, 1, timeout_ms); + int poll_res = poll(&pfd, 1, (int)timeout * 1000); if (poll_res <= 0) { - if (throw_on_timeout) { - tnt_raise(SocketError, fd, "read poll timeout"); - } else { - return read_count; - } + return read_count; } if (pfd.revents & (~(short)POLLIN)) { tnt_raise(SocketError, fd, "poll error %x", (int)pfd.revents); } ssize_t read_res = read(fd, ((int8_t *)buf) + read_count, buf_size - read_count); + if(read_res < 0) { + if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN) { + continue; + } + } if (read_res <= 0) { tnt_raise(SocketError, fd, "read (%zd)", count); } @@ -316,43 +346,39 @@ sio_read_ahead_timeout(int fd, void *buf, size_t count, size_t buf_size, } ssize_t -sio_read_timeout(int fd, void *buf, size_t buf_size, int timeout_ms, - bool throw_on_timeout) +sio_read_timeout(int fd, void *buf, size_t buf_size, float timeout) { - return sio_read_ahead_timeout(fd, buf, 1, buf_size, timeout_ms, - throw_on_timeout); + return sio_read_ahead_timeout(fd, buf, 1, buf_size, timeout); } ssize_t -sio_readn_timeout(int fd, void *buf, size_t count, int timeout_ms, - bool throw_on_timeout) +sio_readn_timeout(int fd, void *buf, size_t count, float timeout) { - return sio_read_ahead_timeout(fd, buf, count, count, timeout_ms, - throw_on_timeout); + return sio_read_ahead_timeout(fd, buf, count, count, timeout); } ssize_t -sio_write_timeout(int fd, const void *buf, size_t count, int timeout_ms, - bool throw_on_timeout) +sio_writen_timeout(int fd, const void *buf, size_t count, float timeout) { pollfd pfd; pfd.events = POLLOUT; pfd.fd = fd; size_t write_count = 0; do { - int poll_res = poll(&pfd, 1, timeout_ms); + int poll_res = poll(&pfd, 1, (int)timeout * 1000); if (poll_res <= 0) { - if (throw_on_timeout) { - tnt_raise(SocketError, fd, "write poll timeout"); - } else { - return write_count; - } + return write_count; } if (pfd.revents & (~(short)POLLOUT)) { tnt_raise(SocketError, fd, "poll error %x", (int)pfd.revents); } ssize_t write_res = write(fd, ((int8_t *)buf) + write_count, count - write_count); + if(write_res < 0) { + if(errno == EWOULDBLOCK || errno == EINTR || errno == EAGAIN) { + continue; + } + } if (write_res <= 0) { tnt_raise(SocketError, fd, "write (%zd)", count); } @@ -361,56 +387,48 @@ sio_write_timeout(int fd, const void *buf, size_t count, int timeout_ms, return write_count; } +#if defined(HAVE_SENDFILE_LINUX) ssize_t -sio_sendfile(int sock_fd, int file_fd, off_t *offset, ssize_t size) -{ - uint64_t to_send; - if(size >= 0) { - to_send = size; - } else { - struct stat st; - if (fstat(file_fd, &st)) { - tnt_raise(SocketError, sock_fd, "sendfile: fstat"); - } - if (offset != NULL) { - if (*offset > st.st_size) { - tnt_raise(SocketError, sock_fd, "sendfile: wrong offset"); - } - to_send = st.st_size - *offset; - } else { - off_t lseek_res = lseek(file_fd, 0, SEEK_CUR); - if(lseek_res == (off_t)-1) { - tnt_raise(SocketError, sock_fd, "sendfile: lseek"); - } - if (lseek_res > st.st_size) { - tnt_raise(SocketError, sock_fd, "sendfile: panic"); - } - to_send = st.st_size - lseek_res; - } +sio_sendfile(int sock_fd, int file_fd, off_t *offset, size_t size) +{ + ssize_t send_res = sendfile(sock_fd, file_fd, offset, size); + if(send_res < size) { + tnt_raise(SocketError, sock_fd, "sendfile"); } - - sio_write_timeout(sock_fd, &to_send, sizeof(to_send), -1, true); - -#ifdef TARGET_OS_LINUX - ssize_t send_res = sendfile(sock_fd, file_fd, offset, to_send); - if(send_res < to_send) { + return send_res; +} +#elif defined(HAVE_SENDFILE_BSD) +ssize_t +sio_sendfile(int sock_fd, int file_fd, off_t *offset, size_t size) +{ + off_t sent_bytes = 0; + int send_res = + sendfile(sock_fd, file_fd, offset, size, NULL, &sent_bytes, NULL); + if(send_res != 0) { + tnt_raise(SocketError, sock_fd, "sendfile"); + } + if(sent_bytes < size) { tnt_raise(SocketError, sock_fd, "sendfile"); } return send_res; +} #else +ssize_t +sio_sendfile(int sock_fd, int file_fd, off_t *offset, size_t size) +{ if(offset) { if(lseek(file_fd, *offset, SEEK_SET) == (off_t)-1) { tnt_raise(SocketError, sock_fd, "sendfile: lseek"); } } - const size_t buffer_size = 4096; + const size_t buffer_size = 8192; int8_t buffer[buffer_size]; size_t bytes_sent = 0; - while (bytes_sent < to_send) { - size_t to_send_now = MIN(to_send - bytes_sent, buffer_size); - ssize_t n = sio_read_timeout(file_fd, buffer, to_send_now, -1, true); - sio_write_timeout(sock_fd, buffer, n, -1, true); + while (bytes_sent < size) { + size_t to_send_now = MIN(size - bytes_sent, buffer_size); + ssize_t n = sio_read_timeout(file_fd, buffer, to_send_now, -1); + sio_writen_timeout(sock_fd, buffer, n, -1); bytes_sent += n; } @@ -421,25 +439,34 @@ sio_sendfile(int sock_fd, int file_fd, off_t *offset, ssize_t size) } return bytes_sent; -#endif /* #ifdef TARGET_OS_LINUX */ } +#endif ssize_t -sio_recvfile(int sock_fd, int file_fd) +sio_recvfile(int sock_fd, int file_fd, off_t *offset, size_t size) { - const size_t buffer_size = 4096; - int8_t buffer[buffer_size]; - - uint64_t to_read; - sio_readn_timeout(sock_fd, &to_read, sizeof(to_read), -1, true); + if(offset) { + if(lseek(file_fd, *offset, SEEK_SET) == (off_t)-1) { + tnt_raise(SocketError, sock_fd, "sendfile: lseek"); + } + } + const size_t buffer_size = 8192; + int8_t buffer[buffer_size]; size_t bytes_read = 0; - while (bytes_read < to_read) { - size_t to_read_now = MIN(to_read - bytes_read, buffer_size); - ssize_t n = sio_read_timeout(sock_fd, buffer, to_read_now, -1, true); - sio_write_timeout(file_fd, buffer, n, -1, true); + while (bytes_read < size) { + size_t to_read_now = MIN(size - bytes_read, buffer_size); + ssize_t n = sio_read_timeout(sock_fd, buffer, to_read_now, -1); + sio_writen_timeout(file_fd, buffer, n, -1); bytes_read += n; } + + if(offset) { + if(lseek(file_fd, *offset, SEEK_SET) == (off_t)-1) { + tnt_raise(SocketError, sock_fd, "sendfile: lseek"); + } + } + return bytes_read; }