Skip to content
Snippets Groups Projects
Commit cb30cc4c authored by Serge Petrenko's avatar Serge Petrenko Committed by Kirill Yukhin
Browse files

applier: extract tx boundary checks from applier_read_tx into a separate routine

Introduce a new routine, set_next_tx_row(), which checks tx boundary
violation and appends the new row to the current tx in case everything
is ok.

set_next_tx_row() is extracted from applier_read_tx() because it's a
common part of transaction assembly both for recovery and applier.

The only difference for recovery will be that the routine which's
responsible for tx assembly won't read rows. It'll be a callback ran on
each new row being read from WAL.

Prerequisite #5874
Part-of #5566
parent eb908469
No related branches found
No related tags found
No related merge requests found
......@@ -657,6 +657,64 @@ applier_read_tx_row(struct applier *applier)
return tx_row;
}
static int64_t
set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
{
struct xrow_header *row = &tx_row->row;
if (iproto_type_is_error(row->type))
xrow_decode_error_xc(row);
/* Replication request. */
if (row->replica_id >= VCLOCK_MAX) {
/*
* A safety net, this can only occur if we're fed a strangely
* broken xlog. row->replica_id == 0, when reading heartbeats
* from an anonymous instance.
*/
tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
int2str(row->replica_id),
tt_uuid_str(&REPLICASET_UUID));
}
if (tsn == 0) {
/*
* Transaction id must be derived from the log sequence number
* of the first row in the transaction.
*/
tsn = row->tsn;
if (row->lsn != tsn)
tnt_raise(ClientError, ER_PROTOCOL,
"Transaction id must be equal to LSN of the "
"first row in the transaction.");
} else if (tsn != row->tsn) {
tnt_raise(ClientError, ER_UNSUPPORTED, "replication",
"interleaving transactions");
}
assert(row->bodycnt <= 1);
if (row->is_commit) {
/* Signal the caller that we've reached the tx end. */
tsn = 0;
} else if (row->bodycnt == 1) {
/*
* Save row body to gc region. Not done for single-statement
* transactions and the last row of multi-statement transactions
* knowing that the input buffer will not be used while the
* transaction is applied.
*/
void *new_base = region_alloc(&fiber()->gc, row->body->iov_len);
if (new_base == NULL)
tnt_raise(OutOfMemory, row->body->iov_len, "region",
"xrow body");
memcpy(new_base, row->body->iov_base, row->body->iov_len);
/* Adjust row body pointers. */
row->body->iov_base = new_base;
}
stailq_add_tail(rows, &tx_row->next);
return tsn;
}
/**
* Read one transaction from network using applier's input buffer.
* Transaction rows are placed onto fiber gc region.
......@@ -672,63 +730,8 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
stailq_create(rows);
do {
struct applier_tx_row *tx_row = applier_read_tx_row(applier);
struct xrow_header *row = &tx_row->row;
if (iproto_type_is_error(row->type))
xrow_decode_error_xc(row);
/* Replication request. */
if (row->replica_id >= VCLOCK_MAX) {
/*
* A safety net, this can only occur
* if we're fed a strangely broken xlog.
* row->replica_id == 0, when reading
* heartbeats from an anonymous instance.
*/
tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
int2str(row->replica_id),
tt_uuid_str(&REPLICASET_UUID));
}
if (tsn == 0) {
/*
* Transaction id must be derived from the log sequence
* number of the first row in the transaction.
*/
tsn = row->tsn;
if (row->lsn != tsn)
tnt_raise(ClientError, ER_PROTOCOL,
"Transaction id must be equal to "
"LSN of the first row in the "
"transaction.");
}
if (tsn != row->tsn)
tnt_raise(ClientError, ER_UNSUPPORTED,
"replication",
"interleaving transactions");
assert(row->bodycnt <= 1);
if (row->bodycnt == 1 && !row->is_commit) {
/*
* Save row body to gc region.
* Not done for single-statement
* transactions knowing that the input
* buffer will not be used while the
* transaction is applied.
*/
void *new_base = region_alloc(&fiber()->gc,
row->body->iov_len);
if (new_base == NULL)
tnt_raise(OutOfMemory, row->body->iov_len,
"region", "xrow body");
memcpy(new_base, row->body->iov_base,
row->body->iov_len);
/* Adjust row body pointers. */
row->body->iov_base = new_base;
}
stailq_add_tail(rows, &tx_row->next);
} while (!stailq_last_entry(rows, struct applier_tx_row,
next)->row.is_commit);
tsn = set_next_tx_row(rows, tx_row, tsn);
} while (tsn != 0);
}
static void
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment