Skip to content
Snippets Groups Projects
Commit 5d6e71e3 authored by Cyrill Gorcunov's avatar Cyrill Gorcunov Committed by Kirill Yukhin
Browse files

box/txn: move journal allocation into separate routine


This makes code easier to read and allows to reuse
txn allocation in sync\async writes.

Acked-by: default avatarKonstantin Osipov <kostja.osipov@gmail.com>
Signed-off-by: default avatarCyrill Gorcunov <gorcunov@gmail.com>
parent 76a2cb04
No related branches found
No related tags found
No related merge requests found
......@@ -478,41 +478,49 @@ txn_complete_async(struct journal_entry *entry, void *complete_data)
fiber_set_txn(fiber(), NULL);
}
static int64_t
txn_write_to_wal(struct txn *txn)
static struct journal_entry *
txn_journal_entry_new(struct txn *txn)
{
struct journal_entry *req;
struct txn_stmt *stmt;
assert(txn->n_new_rows + txn->n_applier_rows > 0);
/* Prepare a journal entry. */
struct journal_entry *req = journal_entry_new(txn->n_new_rows +
txn->n_applier_rows,
&txn->region,
txn_complete_async,
txn);
if (req == NULL) {
txn_rollback(txn);
return -1;
}
req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows,
&txn->region, txn_complete_async, txn);
if (req == NULL)
return NULL;
struct txn_stmt *stmt;
struct xrow_header **remote_row = req->rows;
struct xrow_header **local_row = req->rows + txn->n_applier_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->has_triggers) {
txn_init_triggers(txn);
rlist_splice(&txn->on_commit, &stmt->on_commit);
}
/* A read (e.g. select) request */
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
continue;
if (stmt->row->replica_id == 0)
*local_row++ = stmt->row;
else
*remote_row++ = stmt->row;
req->approx_len += xrow_approx_len(stmt->row);
}
assert(remote_row == req->rows + txn->n_applier_rows);
assert(local_row == remote_row + txn->n_new_rows);
return req;
}
static int64_t
txn_write_to_wal(struct journal_entry *req)
{
/*
* Send the entry to the journal.
*
......@@ -584,6 +592,8 @@ txn_commit_nop(struct txn *txn)
int
txn_commit_async(struct txn *txn)
{
struct journal_entry *req;
if (txn_prepare(txn) != 0) {
txn_rollback(txn);
return -1;
......@@ -592,12 +602,20 @@ txn_commit_async(struct txn *txn)
if (txn_commit_nop(txn))
return 0;
return txn_write_to_wal(txn);
req = txn_journal_entry_new(txn);
if (req == NULL) {
txn_rollback(txn);
return -1;
}
return txn_write_to_wal(req);
}
int
txn_commit(struct txn *txn)
{
struct journal_entry *req;
txn->fiber = fiber();
if (txn_prepare(txn) != 0) {
......@@ -611,7 +629,14 @@ txn_commit(struct txn *txn)
return 0;
}
if (txn_write_to_wal(txn) != 0)
req = txn_journal_entry_new(txn);
if (req == NULL) {
txn_rollback(txn);
txn_free(txn);
return -1;
}
if (txn_write_to_wal(req) != 0)
return -1;
/*
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment