Skip to content
Snippets Groups Projects
Commit 5e551381 authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

Merge remote-tracking branch 'origin/replica-join-coio'

parents 4cd683fc 632369db
No related branches found
No related tags found
No related merge requests found
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "log_io.h" #include "log_io.h"
#include "fiber.h" #include "fiber.h"
#include "scoped_guard.h"
#include "coio_buf.h" #include "coio_buf.h"
#include "recovery.h" #include "recovery.h"
#include "iproto_constants.h" #include "iproto_constants.h"
...@@ -73,42 +74,41 @@ remote_read_row(struct ev_io *coio, struct iobuf *iobuf, ...@@ -73,42 +74,41 @@ remote_read_row(struct ev_io *coio, struct iobuf *iobuf,
iproto_header_decode(row, (const char **) &in->pos, in->pos + len); iproto_header_decode(row, (const char **) &in->pos, in->pos + len);
} }
/* Blocked I/O */
static void static void
remote_read_row_fd(int sock, struct iproto_header *row) remote_write_row(struct ev_io *coio, const struct iproto_header *row)
{ {
const char *data; struct iovec iov[IPROTO_ROW_IOVMAX];
int iovcnt = iproto_row_encode(row, iov);
coio_writev(coio, iov, iovcnt, 0);
}
/* Read fixed header */ static void
char fixheader[IPROTO_FIXHEADER_SIZE]; remote_connect(struct recovery_state *r, struct ev_io *coio,
if (sio_read(sock, fixheader, sizeof(fixheader)) != sizeof(fixheader)) { struct iobuf *iobuf)
error: {
tnt_raise(ClientError, ER_INVALID_MSGPACK, char greeting[IPROTO_GREETING_SIZE];
"invalid fixed header");
}
data = fixheader;
if (mp_check(&data, data + sizeof(fixheader)) != 0)
goto error;
data = fixheader;
/* Read length */ evio_socket(coio, AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (mp_typeof(*data) != MP_UINT)
goto error;
uint32_t len = mp_decode_uint(&data);
if (len > IPROTO_BODY_LEN_MAX) {
tnt_raise(ClientError, ER_INVALID_MSGPACK,
"received packet is too big");
}
/* Read header and body */ struct port_uri *uri = &r->remote.uri;
char *bodybuf = (char *) region_alloc(&fiber()->gc, len);
if (sio_read(sock, bodybuf, len) != len) {
tnt_raise(ClientError, ER_INVALID_MSGPACK,
"invalid row - can't read");
}
data = bodybuf; coio_connect(coio, &uri->addr, uri->addr_len);
iproto_header_decode(row, &data, data + len); coio_readn(coio, greeting, sizeof(greeting));
say_crit("connected to master");
if (!r->remote.uri.login[0])
return;
/* Authenticate */
say_debug("authenticating...");
struct iproto_header row;
iproto_encode_auth(&row, greeting, uri->login, uri->password);
remote_write_row(coio, &row);
remote_read_row(coio, iobuf, &row);
iproto_decode_error(&row); /* auth failed */
/* auth successed */
say_info("authenticated");
} }
void void
...@@ -120,43 +120,26 @@ replica_bootstrap(struct recovery_state *r) ...@@ -120,43 +120,26 @@ replica_bootstrap(struct recovery_state *r)
/* Generate Server-UUID */ /* Generate Server-UUID */
tt_uuid_create(&r->server_uuid); tt_uuid_create(&r->server_uuid);
char greeting[IPROTO_GREETING_SIZE]; struct ev_io coio;
coio_init(&coio);
uint64_t sync = rand(); struct iobuf *iobuf = iobuf_new(fiber_name(fiber()));
struct iovec iov[IPROTO_ROW_IOVMAX]; auto coio_guard = make_scoped_guard([&] {
struct iproto_header row; iobuf_delete(iobuf);
evio_close(loop(), &coio);
});
int master = sio_socket(r->remote.uri.addr.sa_family, remote_connect(r, &coio, iobuf);
SOCK_STREAM, IPPROTO_TCP);
FDGuard guard(master);
sio_connect(master, &r->remote.uri.addr, r->remote.uri.addr_len);
sio_readn(master, greeting, sizeof(greeting));
if (*r->remote.uri.login) {
/* Authenticate */
iproto_encode_auth(&row, greeting, r->remote.uri.login,
r->remote.uri.password);
int iovcnt = iproto_row_encode(&row, iov);
sio_writev_all(master, iov, iovcnt);
remote_read_row_fd(master, &row);
iproto_decode_error(&row); /* auth failed */
/* auth successed */
say_info("authenticated with master");
}
/* Send JOIN request */ /* Send JOIN request */
iproto_encode_join(&row, &recovery_state->server_uuid); struct iproto_header row;
row.sync = sync; iproto_encode_join(&row, &r->server_uuid);
int iovcnt = iproto_row_encode(&row, iov); remote_write_row(&coio, &row);
sio_writev_all(master, iov, iovcnt);
/* Add a surrogate server id for snapshot rows */ /* Add a surrogate server id for snapshot rows */
vclock_add_server(&r->vclock, 0); vclock_add_server(&r->vclock, 0);
while (true) { while (true) {
remote_read_row_fd(master, &row); remote_read_row(&coio, iobuf, &row);
if (iproto_request_is_dml(row.type)) { if (iproto_request_is_dml(row.type)) {
/* Regular snapshot row (IPROTO_INSERT) */ /* Regular snapshot row (IPROTO_INSERT) */
...@@ -182,43 +165,6 @@ replica_bootstrap(struct recovery_state *r) ...@@ -182,43 +165,6 @@ replica_bootstrap(struct recovery_state *r)
/* master socket closed by guard */ /* master socket closed by guard */
} }
static void
remote_connect(struct recovery_state *r, struct ev_io *coio,
struct iobuf *iobuf, const char **err)
{
char greeting[IPROTO_GREETING_SIZE];
struct iovec iov[IPROTO_ROW_IOVMAX];
evio_socket(coio, AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct port_uri *uri = &r->remote.uri;
*err = "can't connect to master";
coio_connect(coio, &uri->addr, uri->addr_len);
coio_readn(coio, greeting, sizeof(greeting));
if (*r->remote.uri.login) {
/* Authenticate */
say_debug("authenticating...");
struct iproto_header row;
iproto_encode_auth(&row, greeting, uri->login, uri->password);
int iovcnt = iproto_row_encode(&row, iov);
coio_writev(coio, iov, iovcnt, 0);
remote_read_row(coio, iobuf, &row);
iproto_decode_error(&row); /* auth failed */
/* auth successed */
say_info("authenticated");
}
/* Send SUBSCRIBE request */
struct iproto_header row;
iproto_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock);
int iovcnt = iproto_row_encode(&row, iov);
coio_writev(coio, iov, iovcnt, 0);
say_crit("connected to master");
}
static void static void
pull_from_remote(va_list ap) pull_from_remote(va_list ap)
{ {
...@@ -234,19 +180,25 @@ pull_from_remote(va_list ap) ...@@ -234,19 +180,25 @@ pull_from_remote(va_list ap)
for (;;) { for (;;) {
const char *err = NULL; const char *err = NULL;
try { try {
struct iproto_header row;
fiber_setcancellable(true); fiber_setcancellable(true);
if (! evio_is_active(&coio)) { if (! evio_is_active(&coio)) {
title("replica", "%s/%s", r->remote.source, title("replica", "%s/%s", r->remote.source,
"connecting"); "connecting");
if (iobuf == NULL) if (iobuf == NULL)
iobuf = iobuf_new(fiber_name(fiber())); iobuf = iobuf_new(fiber_name(fiber()));
remote_connect(r, &coio, iobuf, &err); err = "can't connect to master";
remote_connect(r, &coio, iobuf);
/* Send SUBSCRIBE request */
err = "can't subscribe to master";
iproto_encode_subscribe(&row, &cluster_id,
&r->server_uuid, &r->vclock);
remote_write_row(&coio, &row);
warning_said = false; warning_said = false;
title("replica", "%s/%s", r->remote.source, title("replica", "%s/%s", r->remote.source,
"connected"); "connected");
} }
err = "can't read row"; err = "can't read row";
struct iproto_header row;
remote_read_row(&coio, iobuf, &row); remote_read_row(&coio, iobuf, &row);
if (!iproto_request_is_dml(row.type)) if (!iproto_request_is_dml(row.type))
iproto_decode_error(&row); /* error */ iproto_decode_error(&row); /* error */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment