From 204b238447353a51f6f20c551f3e23f99ceef2fd Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Thu, 28 Mar 2019 11:09:58 +0300 Subject: [PATCH] Rename txn->{n_local_rows,n_remote_rows} We need to keep count of different kind of rows in a transaction: - rows which already exist in some WAL and are replayed locally. These used to be called n_remote_rows, and renamed to n_applier_rows - rows which were created locally, on this server (previously called n_local_rows, renamed to n_new_rows). Of the latter, we need to distinguish between GROUP_ID=LOCAL rows, i.e. rows which need not be replicated, and GROUP_ID=REPLICA rows, which need to be replicated. For example, a remote transaction can fire local triggers which generate local rows. If these triggers generate rows which need to be replicated, the transaction has to be aborted. In a subsequent patch I plan to add n_local_rows, which tracks the number of new rows with GROUP_ID=local and use it in txn_is_distributed() check. --- src/box/txn.c | 34 +++++++++++++++++----------------- src/box/txn.h | 11 +++++++---- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/box/txn.c b/src/box/txn.c index 8ed999435d..0df99b7cd7 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -119,12 +119,12 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) if (txn->engine != NULL && stmt->space != NULL) engine_rollback_statement(txn->engine, txn, stmt); if (stmt->row != NULL && stmt->row->replica_id == 0) { - assert(txn->n_local_rows > 0); - txn->n_local_rows--; + assert(txn->n_new_rows > 0); + txn->n_new_rows--; } if (stmt->row != NULL && stmt->row->replica_id != 0) { - assert(txn->n_remote_rows > 0); - txn->n_remote_rows--; + assert(txn->n_applier_rows > 0); + txn->n_applier_rows--; } txn_stmt_unref_tuples(stmt); stmt->space = NULL; @@ -144,8 +144,8 @@ txn_begin(bool is_autocommit) } /* Initialize members explicitly to save time on memset() */ stailq_create(&txn->stmts); - txn->n_local_rows = 0; - txn->n_remote_rows = 0; + txn->n_new_rows = 0; + txn->n_applier_rows = 0; txn->is_autocommit = is_autocommit; txn->has_triggers = false; txn->is_aborted = false; @@ -221,7 +221,7 @@ bool txn_is_distributed(struct txn *txn) { assert(txn == in_txn()); - if (txn->n_local_rows == 0 || txn->n_remote_rows == 0) + if (txn->n_new_rows == 0 || txn->n_applier_rows == 0) return false; struct txn_stmt *stmt; /* Search for new non local group rows. */ @@ -254,9 +254,9 @@ txn_commit_stmt(struct txn *txn, struct request *request) goto fail; assert(stmt->row != NULL); if (stmt->row->replica_id == 0) - ++txn->n_local_rows; + ++txn->n_new_rows; else - ++txn->n_remote_rows; + ++txn->n_applier_rows; } /* * If there are triggers, and they are not disabled, and @@ -287,16 +287,16 @@ txn_commit_stmt(struct txn *txn, struct request *request) static int64_t txn_write_to_wal(struct txn *txn) { - assert(txn->n_local_rows + txn->n_remote_rows > 0); + assert(txn->n_new_rows + txn->n_applier_rows > 0); - struct journal_entry *req = journal_entry_new(txn->n_local_rows + - txn->n_remote_rows); + struct journal_entry *req = journal_entry_new(txn->n_new_rows + + txn->n_applier_rows); if (req == NULL) return -1; struct txn_stmt *stmt; struct xrow_header **remote_row = req->rows; - struct xrow_header **local_row = req->rows + txn->n_remote_rows; + struct xrow_header **local_row = req->rows + txn->n_applier_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ @@ -306,8 +306,8 @@ txn_write_to_wal(struct txn *txn) *remote_row++ = stmt->row; req->approx_len += xrow_approx_len(stmt->row); } - assert(remote_row == req->rows + txn->n_remote_rows); - assert(local_row == remote_row + txn->n_local_rows); + assert(remote_row == req->rows + txn->n_applier_rows); + assert(local_row == remote_row + txn->n_new_rows); ev_tstamp start = ev_monotonic_now(loop()); int64_t res = journal_write(req); @@ -325,7 +325,7 @@ txn_write_to_wal(struct txn *txn) diag_set(ClientError, ER_WAL_IO); diag_log(); } else if (stop - start > too_long_threshold) { - int n_rows = txn->n_local_rows + txn->n_remote_rows; + int n_rows = txn->n_new_rows + txn->n_applier_rows; say_warn_ratelimited("too long WAL write: %d rows at " "LSN %lld: %.3f sec", n_rows, res - n_rows + 1, stop - start); @@ -361,7 +361,7 @@ txn_commit(struct txn *txn) goto fail; } - if (txn->n_local_rows + txn->n_remote_rows > 0) { + if (txn->n_new_rows + txn->n_applier_rows > 0) { txn->signature = txn_write_to_wal(txn); if (txn->signature < 0) goto fail; diff --git a/src/box/txn.h b/src/box/txn.h index 56260c2f7a..397d845d91 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -140,10 +140,13 @@ struct txn { int64_t id; /** List of statements in a transaction. */ struct stailq stmts; - /** Number of new rows without an assigned lsn. */ - int n_local_rows; - /** Number of rows with an already assigned lsn. */ - int n_remote_rows; + /** Number of new rows without an assigned LSN. */ + int n_new_rows; + /** + * Number of rows coming from the applier, with an + * already assigned LSN. + */ + int n_applier_rows; /** * True if this transaction is running in autocommit mode * (statement end causes an automatic transaction commit). -- GitLab