diff --git a/src/box/replica.cc b/src/box/replica.cc index 23ca0e26cbbe171b43f1c7c5d0b139e94ce38ce7..112bc0f2b9cb8b74aa271d70916b2062e6828e98 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -35,6 +35,7 @@ #include "log_io.h" #include "fiber.h" +#include "scoped_guard.h" #include "coio_buf.h" #include "recovery.h" #include "iproto_constants.h" @@ -73,42 +74,41 @@ remote_read_row(struct ev_io *coio, struct iobuf *iobuf, iproto_header_decode(row, (const char **) &in->pos, in->pos + len); } -/* Blocked I/O */ static void -remote_read_row_fd(int sock, struct iproto_header *row) +remote_write_row(struct ev_io *coio, const struct iproto_header *row) { - const char *data; + struct iovec iov[IPROTO_ROW_IOVMAX]; + int iovcnt = iproto_row_encode(row, iov); + coio_writev(coio, iov, iovcnt, 0); +} - /* Read fixed header */ - char fixheader[IPROTO_FIXHEADER_SIZE]; - if (sio_read(sock, fixheader, sizeof(fixheader)) != sizeof(fixheader)) { -error: - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "invalid fixed header"); - } - data = fixheader; - if (mp_check(&data, data + sizeof(fixheader)) != 0) - goto error; - data = fixheader; +static void +remote_connect(struct recovery_state *r, struct ev_io *coio, + struct iobuf *iobuf) +{ + char greeting[IPROTO_GREETING_SIZE]; - /* Read length */ - if (mp_typeof(*data) != MP_UINT) - goto error; - uint32_t len = mp_decode_uint(&data); - if (len > IPROTO_BODY_LEN_MAX) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "received packet is too big"); - } + evio_socket(coio, AF_INET, SOCK_STREAM, IPPROTO_TCP); - /* Read header and body */ - char *bodybuf = (char *) region_alloc(&fiber()->gc, len); - if (sio_read(sock, bodybuf, len) != len) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "invalid row - can't read"); - } + struct port_uri *uri = &r->remote.uri; - data = bodybuf; - iproto_header_decode(row, &data, data + len); + coio_connect(coio, &uri->addr, uri->addr_len); + coio_readn(coio, greeting, sizeof(greeting)); + + say_crit("connected to master"); + if (!r->remote.uri.login[0]) + return; + + /* Authenticate */ + say_debug("authenticating..."); + struct iproto_header row; + iproto_encode_auth(&row, greeting, uri->login, uri->password); + remote_write_row(coio, &row); + remote_read_row(coio, iobuf, &row); + iproto_decode_error(&row); /* auth failed */ + + /* auth successed */ + say_info("authenticated"); } void @@ -120,43 +120,26 @@ replica_bootstrap(struct recovery_state *r) /* Generate Server-UUID */ tt_uuid_create(&r->server_uuid); - char greeting[IPROTO_GREETING_SIZE]; - - uint64_t sync = rand(); - struct iovec iov[IPROTO_ROW_IOVMAX]; - struct iproto_header row; + struct ev_io coio; + coio_init(&coio); + struct iobuf *iobuf = iobuf_new(fiber_name(fiber())); + auto coio_guard = make_scoped_guard([&] { + iobuf_delete(iobuf); + evio_close(loop(), &coio); + }); - int master = sio_socket(r->remote.uri.addr.sa_family, - SOCK_STREAM, IPPROTO_TCP); - FDGuard guard(master); - - sio_connect(master, &r->remote.uri.addr, r->remote.uri.addr_len); - sio_readn(master, greeting, sizeof(greeting)); - - if (*r->remote.uri.login) { - /* Authenticate */ - iproto_encode_auth(&row, greeting, r->remote.uri.login, - r->remote.uri.password); - int iovcnt = iproto_row_encode(&row, iov); - sio_writev_all(master, iov, iovcnt); - remote_read_row_fd(master, &row); - iproto_decode_error(&row); /* auth failed */ - /* auth successed */ - say_info("authenticated with master"); - } + remote_connect(r, &coio, iobuf); /* Send JOIN request */ - iproto_encode_join(&row, &recovery_state->server_uuid); - row.sync = sync; - int iovcnt = iproto_row_encode(&row, iov); - - sio_writev_all(master, iov, iovcnt); + struct iproto_header row; + iproto_encode_join(&row, &r->server_uuid); + remote_write_row(&coio, &row); /* Add a surrogate server id for snapshot rows */ vclock_add_server(&r->vclock, 0); while (true) { - remote_read_row_fd(master, &row); + remote_read_row(&coio, iobuf, &row); if (iproto_request_is_dml(row.type)) { /* Regular snapshot row (IPROTO_INSERT) */ @@ -182,43 +165,6 @@ replica_bootstrap(struct recovery_state *r) /* master socket closed by guard */ } -static void -remote_connect(struct recovery_state *r, struct ev_io *coio, - struct iobuf *iobuf, const char **err) -{ - char greeting[IPROTO_GREETING_SIZE]; - struct iovec iov[IPROTO_ROW_IOVMAX]; - - evio_socket(coio, AF_INET, SOCK_STREAM, IPPROTO_TCP); - - struct port_uri *uri = &r->remote.uri; - - *err = "can't connect to master"; - coio_connect(coio, &uri->addr, uri->addr_len); - coio_readn(coio, greeting, sizeof(greeting)); - - if (*r->remote.uri.login) { - /* Authenticate */ - say_debug("authenticating..."); - struct iproto_header row; - iproto_encode_auth(&row, greeting, uri->login, uri->password); - int iovcnt = iproto_row_encode(&row, iov); - coio_writev(coio, iov, iovcnt, 0); - remote_read_row(coio, iobuf, &row); - iproto_decode_error(&row); /* auth failed */ - /* auth successed */ - say_info("authenticated"); - } - - /* Send SUBSCRIBE request */ - struct iproto_header row; - iproto_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock); - int iovcnt = iproto_row_encode(&row, iov); - coio_writev(coio, iov, iovcnt, 0); - - say_crit("connected to master"); -} - static void pull_from_remote(va_list ap) { @@ -234,19 +180,25 @@ pull_from_remote(va_list ap) for (;;) { const char *err = NULL; try { + struct iproto_header row; fiber_setcancellable(true); if (! evio_is_active(&coio)) { title("replica", "%s/%s", r->remote.source, "connecting"); if (iobuf == NULL) iobuf = iobuf_new(fiber_name(fiber())); - remote_connect(r, &coio, iobuf, &err); + err = "can't connect to master"; + remote_connect(r, &coio, iobuf); + /* Send SUBSCRIBE request */ + err = "can't subscribe to master"; + iproto_encode_subscribe(&row, &cluster_id, + &r->server_uuid, &r->vclock); + remote_write_row(&coio, &row); warning_said = false; title("replica", "%s/%s", r->remote.source, "connected"); } err = "can't read row"; - struct iproto_header row; remote_read_row(&coio, iobuf, &row); if (!iproto_request_is_dml(row.type)) iproto_decode_error(&row); /* error */