diff --git a/src/box/applier.cc b/src/box/applier.cc index dabc05e9c642d8f4570d515890ffe43d896f75b2..f00ffbd34da693af83f3033f93dde1ea6ad18126 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -59,6 +59,13 @@ STRS(applier_state, applier_STATE); +enum { + /** + * How often to log received row count. Used during join and register. + */ + ROWS_PER_LOG = 100000, +}; + static inline void applier_set_state(struct applier *applier, enum applier_state state) { @@ -292,34 +299,6 @@ apply_row(struct xrow_header *row) return 0; } -static int -apply_final_join_row(struct xrow_header *row) -{ - /* - * Confirms are ignored during join. All the data master - * sends us is valid. - */ - if (iproto_type_is_synchro_request(row->type)) - return 0; - struct txn *txn = txn_begin(); - if (txn == NULL) - return -1; - /* - * Do not wait for confirmation while processing final - * join rows. See apply_snapshot_row(). - */ - txn_set_flags(txn, TXN_FORCE_ASYNC); - if (apply_row(row) != 0) { - txn_rollback(txn); - fiber_gc(); - return -1; - } - if (txn_commit(txn) != 0) - return -1; - fiber_gc(); - return 0; -} - /** * Connect to a remote host and authenticate the client. */ @@ -463,7 +442,7 @@ applier_wait_snapshot(struct applier *applier) if (iproto_type_is_dml(row.type)) { if (apply_snapshot_row(&row) != 0) diag_raise(); - if (++row_count % 100000 == 0) + if (++row_count % ROWS_PER_LOG == 0) say_info("%.1fM rows received", row_count / 1e6); } else if (row.type == IPROTO_OK) { if (applier->version_id < version_id(1, 7, 0)) { @@ -506,12 +485,24 @@ applier_fetch_snapshot(struct applier *applier) } static uint64_t -applier_wait_register(struct applier *applier, uint64_t row_count) -{ - struct ev_io *coio = &applier->io; - struct ibuf *ibuf = &applier->ibuf; +applier_read_tx(struct applier *applier, struct stailq *rows, double timeout); + +static int +apply_final_join_tx(struct stailq *rows); + +/** + * A helper struct to link xrow objects in a list. + */ +struct applier_tx_row { + /* Next transaction row. */ + struct stailq_entry next; + /* xrow_header struct for the current transaction row. */ struct xrow_header row; +}; +static uint64_t +applier_wait_register(struct applier *applier, uint64_t row_count) +{ /* * Tarantool < 1.7.0: there is no "final join" stage. * Proceed to "subscribe" and do not finish bootstrap @@ -520,31 +511,30 @@ applier_wait_register(struct applier *applier, uint64_t row_count) if (applier->version_id < version_id(1, 7, 0)) return row_count; + uint64_t next_log_cnt = + row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG; /* * Receive final data. */ while (true) { - coio_read_xrow(coio, ibuf, &row); - applier->last_row_time = ev_monotonic_now(loop()); - if (iproto_type_is_dml(row.type)) { - vclock_follow_xrow(&replicaset.vclock, &row); - if (apply_final_join_row(&row) != 0) - diag_raise(); - if (++row_count % 100000 == 0) - say_info("%.1fM rows received", row_count / 1e6); - } else if (row.type == IPROTO_OK) { - /* - * Current vclock. This is not used now, - * ignore. - */ - ++row_count; - break; /* end of stream */ - } else if (iproto_type_is_error(row.type)) { - xrow_decode_error_xc(&row); /* rethrow error */ - } else { - tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t) row.type); + struct stailq rows; + row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY); + while (row_count >= next_log_cnt) { + say_info("%.1fM rows received", next_log_cnt / 1e6); + next_log_cnt += ROWS_PER_LOG; } + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + if (first_row->type == IPROTO_OK) { + /* Current vclock. This is not used now, ignore. */ + assert(first_row == + &stailq_last_entry(&rows, struct applier_tx_row, + next)->row); + break; + } + if (apply_final_join_tx(&rows) != 0) + diag_raise(); } return row_count; @@ -616,18 +606,8 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } -/** - * A helper struct to link xrow objects in a list. - */ -struct applier_tx_row { - /* Next transaction row. */ - struct stailq_entry next; - /* xrow_header struct for the current transaction row. */ - struct xrow_header row; -}; - static struct applier_tx_row * -applier_read_tx_row(struct applier *applier) +applier_read_tx_row(struct applier *applier, double timeout) { struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; @@ -640,17 +620,7 @@ applier_read_tx_row(struct applier *applier) struct xrow_header *row = &tx_row->row; - double timeout = replication_disconnect_timeout(); - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) - coio_read_xrow(coio, ibuf, row); - else - coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); applier->lag = ev_now(loop()) - row->tm; applier->last_row_time = ev_monotonic_now(loop()); @@ -722,16 +692,20 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn) * rpos is adjusted as xrow is decoded and the corresponding * network input space is reused for the next xrow. */ -static void -applier_read_tx(struct applier *applier, struct stailq *rows) +static uint64_t +applier_read_tx(struct applier *applier, struct stailq *rows, double timeout) { int64_t tsn = 0; + uint64_t row_count = 0; stailq_create(rows); do { - struct applier_tx_row *tx_row = applier_read_tx_row(applier); + struct applier_tx_row *tx_row = applier_read_tx_row(applier, + timeout); tsn = set_next_tx_row(rows, tx_row, tsn); + ++row_count; } while (tsn != 0); + return row_count; } static void @@ -987,6 +961,26 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) return -1; } +/** A simpler version of applier_apply_tx() for final join stage. */ +static int +apply_final_join_tx(struct stailq *rows) +{ + struct xrow_header *first_row = + &stailq_first_entry(rows, struct applier_tx_row, next)->row; + struct xrow_header *last_row = + &stailq_last_entry(rows, struct applier_tx_row, next)->row; + int rc = 0; + /* WAL isn't enabled yet, so follow vclock manually. */ + vclock_follow_xrow(&replicaset.vclock, last_row); + if (unlikely(iproto_type_is_synchro_request(first_row->type))) { + assert(first_row == last_row); + rc = apply_synchro_row(first_row); + } else { + rc = apply_plain_tx(rows, false, false); + } + fiber_gc(); + return rc; +} /** * Apply all rows in the rows queue as a single transaction. @@ -1250,8 +1244,18 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } + /* + * Tarantool < 1.7.7 does not send periodic heartbeat + * messages so we can't assume that if we haven't heard + * from the master for quite a while the connection is + * broken - the master might just be idle. + */ + double timeout = applier->version_id < version_id(1, 7, 7) ? + TIMEOUT_INFINITY : + replication_disconnect_timeout(); + struct stailq rows; - applier_read_tx(applier, &rows); + applier_read_tx(applier, &rows, timeout); /* * In case of an heartbeat message wake a writer up