diff --git a/src/box/txn.c b/src/box/txn.c index 7900fb3ab2b6d2598ef976deb234b0d60e2dbdf1..deb4fac478de5ff3908e2a1e47cf8c0406794f0b 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -118,9 +118,13 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) stailq_foreach_entry(stmt, &rollback, next) { if (txn->engine != NULL && stmt->space != NULL) engine_rollback_statement(txn->engine, txn, stmt); - if (stmt->row != NULL) { - assert(txn->n_rows > 0); - txn->n_rows--; + if (stmt->row != NULL && stmt->row->replica_id == 0) { + assert(txn->n_local_rows > 0); + txn->n_local_rows--; + } + if (stmt->row != NULL && stmt->row->replica_id != 0) { + assert(txn->n_remote_rows > 0); + txn->n_remote_rows--; } txn_stmt_unref_tuples(stmt); stmt->space = NULL; @@ -140,7 +144,8 @@ txn_begin(bool is_autocommit) } /* Initialize members explicitly to save time on memset() */ stailq_create(&txn->stmts); - txn->n_rows = 0; + txn->n_local_rows = 0; + txn->n_remote_rows = 0; txn->is_autocommit = is_autocommit; txn->has_triggers = false; txn->is_aborted = false; @@ -233,7 +238,11 @@ txn_commit_stmt(struct txn *txn, struct request *request) if (stmt->space == NULL || !space_is_temporary(stmt->space)) { if (txn_add_redo(stmt, request) != 0) goto fail; - ++txn->n_rows; + assert(stmt->row != NULL); + if (stmt->row->replica_id == 0) + ++txn->n_local_rows; + else + ++txn->n_remote_rows; } /* * If there are triggers, and they are not disabled, and @@ -264,21 +273,27 @@ txn_commit_stmt(struct txn *txn, struct request *request) static int64_t txn_write_to_wal(struct txn *txn) { - assert(txn->n_rows > 0); + assert(txn->n_local_rows + txn->n_remote_rows > 0); - struct journal_entry *req = journal_entry_new(txn->n_rows); + struct journal_entry *req = journal_entry_new(txn->n_local_rows + + txn->n_remote_rows); if (req == NULL) return -1; struct txn_stmt *stmt; - struct xrow_header **row = req->rows; + struct xrow_header **remote_row = req->rows; + struct xrow_header **local_row = req->rows + txn->n_remote_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ - *row++ = stmt->row; + if (stmt->row->replica_id == 0) + *local_row++ = stmt->row; + else + *remote_row++ = stmt->row; req->approx_len += xrow_approx_len(stmt->row); } - assert(row == req->rows + req->n_rows); + assert(remote_row == req->rows + txn->n_remote_rows); + assert(local_row == remote_row + txn->n_local_rows); ev_tstamp start = ev_monotonic_now(loop()); int64_t res = journal_write(req); @@ -296,9 +311,10 @@ txn_write_to_wal(struct txn *txn) diag_set(ClientError, ER_WAL_IO); diag_log(); } else if (stop - start > too_long_threshold) { + int n_rows = txn->n_local_rows + txn->n_remote_rows; say_warn_ratelimited("too long WAL write: %d rows at " - "LSN %lld: %.3f sec", txn->n_rows, - res - txn->n_rows + 1, stop - start); + "LSN %lld: %.3f sec", n_rows, + res - n_rows + 1, stop - start); } /* * Use vclock_sum() from WAL writer as transaction signature. @@ -331,7 +347,7 @@ txn_commit(struct txn *txn) goto fail; } - if (txn->n_rows > 0) { + if (txn->n_local_rows + txn->n_remote_rows > 0) { txn->signature = txn_write_to_wal(txn); if (txn->signature < 0) goto fail; diff --git a/src/box/txn.h b/src/box/txn.h index de5cb0de41af4d46a55d8c58568b1ad70235cce3..c9829da9ec09340fe2e3b148d2794eb08c44722c 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -140,8 +140,10 @@ struct txn { int64_t id; /** List of statements in a transaction. */ struct stailq stmts; - /** Total number of WAL rows in this txn. */ - int n_rows; + /** Number of new rows without an assigned lsn. */ + int n_local_rows; + /** Number of rows with an already assigned lsn. */ + int n_remote_rows; /** * True if this transaction is running in autocommit mode * (statement end causes an automatic transaction commit).