Skip to content
Snippets Groups Projects
Commit f35c647a authored by Vladimir Davydov's avatar Vladimir Davydov Committed by Vladimir Davydov
Browse files

Rename local variables coio -> io

We are going to wrap fd in iostream struct and pass it to all coio
methods. When we do that, the name 'coio' won't make any sense. Let's
switch to 'io'. We do renaming in a separate patch to reduce the blast
radius of the main patch.
parent 4f73a2e4
No related branches found
No related tags found
No related merge requests found
......@@ -320,9 +320,9 @@ apply_row(struct xrow_header *row)
void
applier_connect(struct applier *applier)
{
struct ev_io *coio = &applier->io;
struct ev_io *io = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
if (coio->fd >= 0)
if (io->fd >= 0)
return;
char greetingbuf[IPROTO_GREETING_SIZE];
struct xrow_header row;
......@@ -338,9 +338,9 @@ applier_connect(struct applier *applier)
*/
applier->addr_len = sizeof(applier->addrstorage);
applier_set_state(applier, APPLIER_CONNECT);
coio->fd = coio_connect(uri, &applier->addr, &applier->addr_len);
assert(coio->fd >= 0);
coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE);
io->fd = coio_connect(uri, &applier->addr, &applier->addr_len);
assert(io->fd >= 0);
coio_readn(io, greetingbuf, IPROTO_GREETING_SIZE);
applier->last_row_time = ev_monotonic_now(loop());
/* Decode instance version and name from greeting */
......@@ -375,8 +375,8 @@ applier_connect(struct applier *applier)
* election on bootstrap.
*/
xrow_encode_vote(&row);
coio_write_xrow(coio, &row);
coio_read_xrow(coio, ibuf, &row);
coio_write_xrow(io, &row);
coio_read_xrow(io, ibuf, &row);
if (row.type == IPROTO_OK) {
xrow_decode_ballot_xc(&row, &applier->ballot);
} else try {
......@@ -406,8 +406,8 @@ applier_connect(struct applier *applier)
uri->login_len,
uri->password != NULL ? uri->password : "",
uri->password_len);
coio_write_xrow(coio, &row);
coio_read_xrow(coio, ibuf, &row);
coio_write_xrow(io, &row);
coio_read_xrow(io, ibuf, &row);
applier->last_row_time = ev_monotonic_now(loop());
if (row.type != IPROTO_OK)
xrow_decode_error_xc(&row); /* auth failed */
......@@ -421,7 +421,7 @@ applier_connect(struct applier *applier)
static uint64_t
applier_wait_snapshot(struct applier *applier)
{
struct ev_io *coio = &applier->io;
struct ev_io *io = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
......@@ -431,7 +431,7 @@ applier_wait_snapshot(struct applier *applier)
*/
if (applier->version_id >= version_id(1, 7, 0)) {
/* Decode JOIN/FETCH_SNAPSHOT response */
coio_read_xrow(coio, ibuf, &row);
coio_read_xrow(io, ibuf, &row);
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row); /* re-throw error */
} else if (row.type != IPROTO_OK) {
......@@ -447,11 +447,11 @@ applier_wait_snapshot(struct applier *applier)
xrow_decode_vclock_xc(&row, &replicaset.vclock);
}
coio_read_xrow(coio, ibuf, &row);
coio_read_xrow(io, ibuf, &row);
if (row.type == IPROTO_JOIN_META) {
/* Read additional metadata. Empty at the moment. */
do {
coio_read_xrow(coio, ibuf, &row);
coio_read_xrow(io, ibuf, &row);
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row);
} else if (iproto_type_is_promote_request(row.type)) {
......@@ -469,7 +469,7 @@ applier_wait_snapshot(struct applier *applier)
(uint32_t)row.type);
}
} while (row.type != IPROTO_JOIN_SNAPSHOT);
coio_read_xrow(coio, ibuf, &row);
coio_read_xrow(io, ibuf, &row);
}
/*
......@@ -501,7 +501,7 @@ applier_wait_snapshot(struct applier *applier)
tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) row.type);
}
coio_read_xrow(coio, ibuf, &row);
coio_read_xrow(io, ibuf, &row);
}
return row_count;
......@@ -511,12 +511,12 @@ static void
applier_fetch_snapshot(struct applier *applier)
{
/* Send FETCH SNAPSHOT request */
struct ev_io *coio = &applier->io;
struct ev_io *io = &applier->io;
struct xrow_header row;
memset(&row, 0, sizeof(row));
row.type = IPROTO_FETCH_SNAPSHOT;
coio_write_xrow(coio, &row);
coio_write_xrow(io, &row);
applier_set_state(applier, APPLIER_FETCH_SNAPSHOT);
applier_wait_snapshot(applier);
......@@ -584,7 +584,7 @@ static void
applier_register(struct applier *applier, bool was_anon)
{
/* Send REGISTER request */
struct ev_io *coio = &applier->io;
struct ev_io *io = &applier->io;
struct xrow_header row;
memset(&row, 0, sizeof(row));
......@@ -594,7 +594,7 @@ applier_register(struct applier *applier, bool was_anon)
*/
xrow_encode_register(&row, &INSTANCE_UUID, box_vclock);
row.type = IPROTO_REGISTER;
coio_write_xrow(coio, &row);
coio_write_xrow(io, &row);
/*
* Register may serve as a retry for final join. Set corresponding
......@@ -616,12 +616,12 @@ static void
applier_join(struct applier *applier)
{
/* Send JOIN request */
struct ev_io *coio = &applier->io;
struct ev_io *io = &applier->io;
struct xrow_header row;
uint64_t row_count;
xrow_encode_join_xc(&row, &INSTANCE_UUID);
coio_write_xrow(coio, &row);
coio_write_xrow(io, &row);
applier_set_state(applier, APPLIER_INITIAL_JOIN);
......@@ -649,7 +649,7 @@ applier_join(struct applier *applier)
static struct applier_tx_row *
applier_read_tx_row(struct applier *applier, double timeout)
{
struct ev_io *coio = &applier->io;
struct ev_io *io = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
size_t size;
struct applier_tx_row *tx_row =
......@@ -662,7 +662,7 @@ applier_read_tx_row(struct applier *applier, double timeout)
ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY);
coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
coio_read_xrow_timeout_xc(io, ibuf, row, timeout);
if (row->tm > 0)
applier->lag = ev_now(loop()) - row->tm;
......@@ -1250,7 +1250,7 @@ static void
applier_subscribe(struct applier *applier)
{
/* Send SUBSCRIBE request */
struct ev_io *coio = &applier->io;
struct ev_io *io = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
struct tt_uuid cluster_id = uuid_nil;
......@@ -1270,11 +1270,11 @@ applier_subscribe(struct applier *applier)
uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&vclock, replication_anon, id_filter);
coio_write_xrow(coio, &row);
coio_write_xrow(io, &row);
/* Read SUBSCRIBE response */
if (applier->version_id >= version_id(1, 6, 7)) {
coio_read_xrow(coio, ibuf, &row);
coio_read_xrow(io, ibuf, &row);
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row); /* error */
} else if (row.type != IPROTO_OK) {
......
......@@ -36,11 +36,11 @@
#include "msgpuck/msgpuck.h"
void
coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
coio_read_xrow(struct ev_io *io, struct ibuf *in, struct xrow_header *row)
{
/* Read fixed header */
if (ibuf_used(in) < 1)
coio_breadn(coio, in, 1);
coio_breadn(io, in, 1);
/* Read length */
if (mp_typeof(*in->rpos) != MP_UINT) {
......@@ -49,28 +49,28 @@ coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
}
ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
if (to_read > 0)
coio_breadn(coio, in, to_read);
coio_breadn(io, in, to_read);
uint32_t len = mp_decode_uint((const char **) &in->rpos);
/* Read header and body */
to_read = len - ibuf_used(in);
if (to_read > 0)
coio_breadn(coio, in, to_read);
coio_breadn(io, in, to_read);
xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
true);
}
void
coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
coio_read_xrow_timeout_xc(struct ev_io *io, struct ibuf *in,
struct xrow_header *row, ev_tstamp timeout)
{
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
/* Read fixed header */
if (ibuf_used(in) < 1)
coio_breadn_timeout(coio, in, 1, delay);
coio_breadn_timeout(io, in, 1, delay);
coio_timeout_update(&start, &delay);
/* Read length */
......@@ -80,7 +80,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
}
ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
if (to_read > 0)
coio_breadn_timeout(coio, in, to_read, delay);
coio_breadn_timeout(io, in, to_read, delay);
coio_timeout_update(&start, &delay);
uint32_t len = mp_decode_uint((const char **) &in->rpos);
......@@ -88,7 +88,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
/* Read header and body */
to_read = len - ibuf_used(in);
if (to_read > 0)
coio_breadn_timeout(coio, in, to_read, delay);
coio_breadn_timeout(io, in, to_read, delay);
xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
true);
......@@ -96,10 +96,10 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
void
coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
coio_write_xrow(struct ev_io *io, const struct xrow_header *row)
{
struct iovec iov[XROW_IOVMAX];
int iovcnt = xrow_to_iovec_xc(row, iov);
coio_writev(coio, iov, iovcnt, 0);
coio_writev(io, iov, iovcnt, 0);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment