Skip to content
Snippets Groups Projects
Commit 27283deb authored by Georgy Kirichenko's avatar Georgy Kirichenko Committed by Vladimir Davydov
Browse files

txn: move rows without lsn to the transaction tail

Form a separate transaction with local changes in case of replication.
This is important because we should be able to replicate such changes
(e.g. made within an on_replace trigger) back. In the opposite case
local changes will be incorporated into originating transaction and
would be skipped by the originator replica.

Needed for #2798
parent 19b5fb1c
No related branches found
No related tags found
No related merge requests found
...@@ -118,9 +118,13 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) ...@@ -118,9 +118,13 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
stailq_foreach_entry(stmt, &rollback, next) { stailq_foreach_entry(stmt, &rollback, next) {
if (txn->engine != NULL && stmt->space != NULL) if (txn->engine != NULL && stmt->space != NULL)
engine_rollback_statement(txn->engine, txn, stmt); engine_rollback_statement(txn->engine, txn, stmt);
if (stmt->row != NULL) { if (stmt->row != NULL && stmt->row->replica_id == 0) {
assert(txn->n_rows > 0); assert(txn->n_local_rows > 0);
txn->n_rows--; txn->n_local_rows--;
}
if (stmt->row != NULL && stmt->row->replica_id != 0) {
assert(txn->n_remote_rows > 0);
txn->n_remote_rows--;
} }
txn_stmt_unref_tuples(stmt); txn_stmt_unref_tuples(stmt);
stmt->space = NULL; stmt->space = NULL;
...@@ -140,7 +144,8 @@ txn_begin(bool is_autocommit) ...@@ -140,7 +144,8 @@ txn_begin(bool is_autocommit)
} }
/* Initialize members explicitly to save time on memset() */ /* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts); stailq_create(&txn->stmts);
txn->n_rows = 0; txn->n_local_rows = 0;
txn->n_remote_rows = 0;
txn->is_autocommit = is_autocommit; txn->is_autocommit = is_autocommit;
txn->has_triggers = false; txn->has_triggers = false;
txn->is_aborted = false; txn->is_aborted = false;
...@@ -233,7 +238,11 @@ txn_commit_stmt(struct txn *txn, struct request *request) ...@@ -233,7 +238,11 @@ txn_commit_stmt(struct txn *txn, struct request *request)
if (stmt->space == NULL || !space_is_temporary(stmt->space)) { if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
if (txn_add_redo(stmt, request) != 0) if (txn_add_redo(stmt, request) != 0)
goto fail; goto fail;
++txn->n_rows; assert(stmt->row != NULL);
if (stmt->row->replica_id == 0)
++txn->n_local_rows;
else
++txn->n_remote_rows;
} }
/* /*
* If there are triggers, and they are not disabled, and * If there are triggers, and they are not disabled, and
...@@ -264,21 +273,27 @@ txn_commit_stmt(struct txn *txn, struct request *request) ...@@ -264,21 +273,27 @@ txn_commit_stmt(struct txn *txn, struct request *request)
static int64_t static int64_t
txn_write_to_wal(struct txn *txn) txn_write_to_wal(struct txn *txn)
{ {
assert(txn->n_rows > 0); assert(txn->n_local_rows + txn->n_remote_rows > 0);
struct journal_entry *req = journal_entry_new(txn->n_rows); struct journal_entry *req = journal_entry_new(txn->n_local_rows +
txn->n_remote_rows);
if (req == NULL) if (req == NULL)
return -1; return -1;
struct txn_stmt *stmt; struct txn_stmt *stmt;
struct xrow_header **row = req->rows; struct xrow_header **remote_row = req->rows;
struct xrow_header **local_row = req->rows + txn->n_remote_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) { stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL) if (stmt->row == NULL)
continue; /* A read (e.g. select) request */ continue; /* A read (e.g. select) request */
*row++ = stmt->row; if (stmt->row->replica_id == 0)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
req->approx_len += xrow_approx_len(stmt->row); req->approx_len += xrow_approx_len(stmt->row);
} }
assert(row == req->rows + req->n_rows); assert(remote_row == req->rows + txn->n_remote_rows);
assert(local_row == remote_row + txn->n_local_rows);
ev_tstamp start = ev_monotonic_now(loop()); ev_tstamp start = ev_monotonic_now(loop());
int64_t res = journal_write(req); int64_t res = journal_write(req);
...@@ -296,9 +311,10 @@ txn_write_to_wal(struct txn *txn) ...@@ -296,9 +311,10 @@ txn_write_to_wal(struct txn *txn)
diag_set(ClientError, ER_WAL_IO); diag_set(ClientError, ER_WAL_IO);
diag_log(); diag_log();
} else if (stop - start > too_long_threshold) { } else if (stop - start > too_long_threshold) {
int n_rows = txn->n_local_rows + txn->n_remote_rows;
say_warn_ratelimited("too long WAL write: %d rows at " say_warn_ratelimited("too long WAL write: %d rows at "
"LSN %lld: %.3f sec", txn->n_rows, "LSN %lld: %.3f sec", n_rows,
res - txn->n_rows + 1, stop - start); res - n_rows + 1, stop - start);
} }
/* /*
* Use vclock_sum() from WAL writer as transaction signature. * Use vclock_sum() from WAL writer as transaction signature.
...@@ -331,7 +347,7 @@ txn_commit(struct txn *txn) ...@@ -331,7 +347,7 @@ txn_commit(struct txn *txn)
goto fail; goto fail;
} }
if (txn->n_rows > 0) { if (txn->n_local_rows + txn->n_remote_rows > 0) {
txn->signature = txn_write_to_wal(txn); txn->signature = txn_write_to_wal(txn);
if (txn->signature < 0) if (txn->signature < 0)
goto fail; goto fail;
......
...@@ -140,8 +140,10 @@ struct txn { ...@@ -140,8 +140,10 @@ struct txn {
int64_t id; int64_t id;
/** List of statements in a transaction. */ /** List of statements in a transaction. */
struct stailq stmts; struct stailq stmts;
/** Total number of WAL rows in this txn. */ /** Number of new rows without an assigned lsn. */
int n_rows; int n_local_rows;
/** Number of rows with an already assigned lsn. */
int n_remote_rows;
/** /**
* True if this transaction is running in autocommit mode * True if this transaction is running in autocommit mode
* (statement end causes an automatic transaction commit). * (statement end causes an automatic transaction commit).
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment