diff --git a/src/box/applier.cc b/src/box/applier.cc index 46f0ab51693e6fc9591e378ea187810861bd3806..34e94cbaa65c40ad0dec468cba2cf4780d6c2245 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -320,9 +320,9 @@ apply_row(struct xrow_header *row) void applier_connect(struct applier *applier) { - struct ev_io *coio = &applier->io; + struct ev_io *io = &applier->io; struct ibuf *ibuf = &applier->ibuf; - if (coio->fd >= 0) + if (io->fd >= 0) return; char greetingbuf[IPROTO_GREETING_SIZE]; struct xrow_header row; @@ -338,9 +338,9 @@ applier_connect(struct applier *applier) */ applier->addr_len = sizeof(applier->addrstorage); applier_set_state(applier, APPLIER_CONNECT); - coio->fd = coio_connect(uri, &applier->addr, &applier->addr_len); - assert(coio->fd >= 0); - coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE); + io->fd = coio_connect(uri, &applier->addr, &applier->addr_len); + assert(io->fd >= 0); + coio_readn(io, greetingbuf, IPROTO_GREETING_SIZE); applier->last_row_time = ev_monotonic_now(loop()); /* Decode instance version and name from greeting */ @@ -375,8 +375,8 @@ applier_connect(struct applier *applier) * election on bootstrap. */ xrow_encode_vote(&row); - coio_write_xrow(coio, &row); - coio_read_xrow(coio, ibuf, &row); + coio_write_xrow(io, &row); + coio_read_xrow(io, ibuf, &row); if (row.type == IPROTO_OK) { xrow_decode_ballot_xc(&row, &applier->ballot); } else try { @@ -406,8 +406,8 @@ applier_connect(struct applier *applier) uri->login_len, uri->password != NULL ? uri->password : "", uri->password_len); - coio_write_xrow(coio, &row); - coio_read_xrow(coio, ibuf, &row); + coio_write_xrow(io, &row); + coio_read_xrow(io, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (row.type != IPROTO_OK) xrow_decode_error_xc(&row); /* auth failed */ @@ -421,7 +421,7 @@ applier_connect(struct applier *applier) static uint64_t applier_wait_snapshot(struct applier *applier) { - struct ev_io *coio = &applier->io; + struct ev_io *io = &applier->io; struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; @@ -431,7 +431,7 @@ applier_wait_snapshot(struct applier *applier) */ if (applier->version_id >= version_id(1, 7, 0)) { /* Decode JOIN/FETCH_SNAPSHOT response */ - coio_read_xrow(coio, ibuf, &row); + coio_read_xrow(io, ibuf, &row); if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); /* re-throw error */ } else if (row.type != IPROTO_OK) { @@ -447,11 +447,11 @@ applier_wait_snapshot(struct applier *applier) xrow_decode_vclock_xc(&row, &replicaset.vclock); } - coio_read_xrow(coio, ibuf, &row); + coio_read_xrow(io, ibuf, &row); if (row.type == IPROTO_JOIN_META) { /* Read additional metadata. Empty at the moment. */ do { - coio_read_xrow(coio, ibuf, &row); + coio_read_xrow(io, ibuf, &row); if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); } else if (iproto_type_is_promote_request(row.type)) { @@ -469,7 +469,7 @@ applier_wait_snapshot(struct applier *applier) (uint32_t)row.type); } } while (row.type != IPROTO_JOIN_SNAPSHOT); - coio_read_xrow(coio, ibuf, &row); + coio_read_xrow(io, ibuf, &row); } /* @@ -501,7 +501,7 @@ applier_wait_snapshot(struct applier *applier) tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row.type); } - coio_read_xrow(coio, ibuf, &row); + coio_read_xrow(io, ibuf, &row); } return row_count; @@ -511,12 +511,12 @@ static void applier_fetch_snapshot(struct applier *applier) { /* Send FETCH SNAPSHOT request */ - struct ev_io *coio = &applier->io; + struct ev_io *io = &applier->io; struct xrow_header row; memset(&row, 0, sizeof(row)); row.type = IPROTO_FETCH_SNAPSHOT; - coio_write_xrow(coio, &row); + coio_write_xrow(io, &row); applier_set_state(applier, APPLIER_FETCH_SNAPSHOT); applier_wait_snapshot(applier); @@ -584,7 +584,7 @@ static void applier_register(struct applier *applier, bool was_anon) { /* Send REGISTER request */ - struct ev_io *coio = &applier->io; + struct ev_io *io = &applier->io; struct xrow_header row; memset(&row, 0, sizeof(row)); @@ -594,7 +594,7 @@ applier_register(struct applier *applier, bool was_anon) */ xrow_encode_register(&row, &INSTANCE_UUID, box_vclock); row.type = IPROTO_REGISTER; - coio_write_xrow(coio, &row); + coio_write_xrow(io, &row); /* * Register may serve as a retry for final join. Set corresponding @@ -616,12 +616,12 @@ static void applier_join(struct applier *applier) { /* Send JOIN request */ - struct ev_io *coio = &applier->io; + struct ev_io *io = &applier->io; struct xrow_header row; uint64_t row_count; xrow_encode_join_xc(&row, &INSTANCE_UUID); - coio_write_xrow(coio, &row); + coio_write_xrow(io, &row); applier_set_state(applier, APPLIER_INITIAL_JOIN); @@ -649,7 +649,7 @@ applier_join(struct applier *applier) static struct applier_tx_row * applier_read_tx_row(struct applier *applier, double timeout) { - struct ev_io *coio = &applier->io; + struct ev_io *io = &applier->io; struct ibuf *ibuf = &applier->ibuf; size_t size; struct applier_tx_row *tx_row = @@ -662,7 +662,7 @@ applier_read_tx_row(struct applier *applier, double timeout) ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY); - coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + coio_read_xrow_timeout_xc(io, ibuf, row, timeout); if (row->tm > 0) applier->lag = ev_now(loop()) - row->tm; @@ -1250,7 +1250,7 @@ static void applier_subscribe(struct applier *applier) { /* Send SUBSCRIBE request */ - struct ev_io *coio = &applier->io; + struct ev_io *io = &applier->io; struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; struct tt_uuid cluster_id = uuid_nil; @@ -1270,11 +1270,11 @@ applier_subscribe(struct applier *applier) uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id; xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &vclock, replication_anon, id_filter); - coio_write_xrow(coio, &row); + coio_write_xrow(io, &row); /* Read SUBSCRIBE response */ if (applier->version_id >= version_id(1, 6, 7)) { - coio_read_xrow(coio, ibuf, &row); + coio_read_xrow(io, ibuf, &row); if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); /* error */ } else if (row.type != IPROTO_OK) { diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc index 48707982bd2120db59c3ab1ce89e167074c02a29..0d687732f0bc52e68cc599e4043a2c7b0f6475ac 100644 --- a/src/box/xrow_io.cc +++ b/src/box/xrow_io.cc @@ -36,11 +36,11 @@ #include "msgpuck/msgpuck.h" void -coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row) +coio_read_xrow(struct ev_io *io, struct ibuf *in, struct xrow_header *row) { /* Read fixed header */ if (ibuf_used(in) < 1) - coio_breadn(coio, in, 1); + coio_breadn(io, in, 1); /* Read length */ if (mp_typeof(*in->rpos) != MP_UINT) { @@ -49,28 +49,28 @@ coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row) } ssize_t to_read = mp_check_uint(in->rpos, in->wpos); if (to_read > 0) - coio_breadn(coio, in, to_read); + coio_breadn(io, in, to_read); uint32_t len = mp_decode_uint((const char **) &in->rpos); /* Read header and body */ to_read = len - ibuf_used(in); if (to_read > 0) - coio_breadn(coio, in, to_read); + coio_breadn(io, in, to_read); xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, true); } void -coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, +coio_read_xrow_timeout_xc(struct ev_io *io, struct ibuf *in, struct xrow_header *row, ev_tstamp timeout) { ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); /* Read fixed header */ if (ibuf_used(in) < 1) - coio_breadn_timeout(coio, in, 1, delay); + coio_breadn_timeout(io, in, 1, delay); coio_timeout_update(&start, &delay); /* Read length */ @@ -80,7 +80,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, } ssize_t to_read = mp_check_uint(in->rpos, in->wpos); if (to_read > 0) - coio_breadn_timeout(coio, in, to_read, delay); + coio_breadn_timeout(io, in, to_read, delay); coio_timeout_update(&start, &delay); uint32_t len = mp_decode_uint((const char **) &in->rpos); @@ -88,7 +88,7 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, /* Read header and body */ to_read = len - ibuf_used(in); if (to_read > 0) - coio_breadn_timeout(coio, in, to_read, delay); + coio_breadn_timeout(io, in, to_read, delay); xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, true); @@ -96,10 +96,10 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, void -coio_write_xrow(struct ev_io *coio, const struct xrow_header *row) +coio_write_xrow(struct ev_io *io, const struct xrow_header *row) { struct iovec iov[XROW_IOVMAX]; int iovcnt = xrow_to_iovec_xc(row, iov); - coio_writev(coio, iov, iovcnt, 0); + coio_writev(io, iov, iovcnt, 0); }