diff --git a/src/box/wal.cc b/src/box/wal.cc index bd15c6cb60b8ff0a01213276eb1adc183c9a4ac5..79fb5985a3f9dce50486699859cebb2812b5af4a 100644 --- a/src/box/wal.cc +++ b/src/box/wal.cc @@ -145,7 +145,7 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock, writer->rows_per_wal = rows_per_wal; - writer->batch = fio_batch_alloc(sysconf(_SC_IOV_MAX)); + writer->batch = fio_batch_new(); if (writer->batch == NULL) panic_syserror("fio_batch_alloc"); @@ -161,7 +161,7 @@ wal_writer_destroy(struct wal_writer *writer) { cpipe_destroy(&writer->tx_pipe); cbus_destroy(&writer->tx_wal_bus); - free(writer->batch); + fio_batch_delete(writer->batch); } /** WAL writer thread routine. */ @@ -276,66 +276,17 @@ wal_opt_rotate(struct xlog **wal, struct recovery_state *r, return l ? 0 : -1; } -static struct wal_request * -wal_fill_batch(struct xlog *wal, struct fio_batch *batch, int rows_per_wal, - struct wal_request *req) -{ - int max_rows = rows_per_wal - wal->rows; - /* Post-condition of successful wal_opt_rotate(). */ - assert(max_rows > 0); - fio_batch_start(batch, max_rows); - - while (req != NULL && batch->rows < batch->max_rows) { - int iovcnt = 0; - struct iovec *iov; - struct xrow_header **row = req->rows; - for (; row < req->rows + req->n_rows; row++) { - iov = fio_batch_book(batch, iovcnt, XROW_IOVMAX); - if (iov == NULL) { - /* - * No space in the batch for - * this transaction, open a new - * batch for it and hope that it - * is sufficient to hold it. - */ - return req; - } - iovcnt += xlog_encode_row(*row, iov); - } - fio_batch_add(batch, iovcnt); - req = (struct wal_request *) STAILQ_NEXT(req, fifo); - } - return req; -} - /** * fio_batch_write() version with recovery specific * error injection. */ -static inline int +static inline ssize_t wal_fio_batch_write(struct fio_batch *batch, int fd) { ERROR_INJECT(ERRINJ_WAL_WRITE, return 0); return fio_batch_write(batch, fd); } -static struct wal_request * -wal_write_batch(struct xlog *wal, struct fio_batch *batch, - struct wal_request *req, struct wal_request *end, - struct vclock *vclock) -{ - int rows_written = wal_fio_batch_write(batch, fileno(wal->f)); - wal->rows += rows_written; - while (req != end && rows_written-- != 0) { - vclock_follow(vclock, - req->rows[req->n_rows - 1]->server_id, - req->rows[req->n_rows - 1]->lsn); - req->res = 0; - req = (struct wal_request *) STAILQ_NEXT(req, fifo); - } - return req; -} - /** * Pop a bulk of requests to write to disk to process. * Block on the condition only if we have no other work to @@ -356,35 +307,159 @@ wal_writer_pop(struct wal_writer *writer) } } - static void wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer, struct cmsg_fifo *input, struct cmsg_fifo *commit, struct cmsg_fifo *rollback) { - struct xlog **wal = &r->current_wal; + /* + * Input queue can only be empty on wal writer shutdown. + * In this case wal_opt_rotate can create an extra empty xlog. + */ + if (unlikely(STAILQ_EMPTY(input))) + return; + + /* Xlog is only rotated between queue processing */ + if (wal_opt_rotate(&r->current_wal, r, &writer->vclock) != 0) { + STAILQ_SPLICE(input, STAILQ_FIRST(input), fifo, rollback); + return; + } + + /* + * This code tries to write queued requests (=transactions) using as + * less as possible I/O syscalls and memory copies. For this reason + * writev(2) and `struct iovec[]` are used (see `struct fio_batch`). + * + * For each request (=transaction) each request row (=statement) is + * added to iov `batch`. A row can contain up to XLOG_IOVMAX iovecs. + * A request can have an **unlimited** number of rows. Since OS has + * hardcoded limit up to `sysconf(_SC_IOV_MAX)` iovecs (usually 1024), + * a single batch can't fit huge transactions. Therefore, it is not + * possible to "atomically" write an entire transaction using the + * single writev(2) call. + * + * Request boundaries and batch boundaries are not connected at all + * in this code. Batches flushed to disk as soon as they are full. + * In order to guarantee that a transaction is either fully written + * to file or isn't written at all, ftruncate(2) is used to shrink + * file to the last fuly written request. The absolute position + * of request in xlog file is stored inside `struct wal_request`. + */ + + struct xlog *wal = r->current_wal; + /* The size of batched data */ + off_t batched_bytes = 0; + /* The size of written data */ + off_t written_bytes = 0; + /* Start new iov batch */ struct fio_batch *batch = writer->batch; + fio_batch_reset(batch); - struct wal_request *req = (struct wal_request *) STAILQ_FIRST(input); - struct wal_request *write_end = req; + /* + * Iterate over requests (transactions) + */ + struct wal_request *req; + for (req = (struct wal_request *) STAILQ_FIRST(input); + req != NULL; + req = (struct wal_request *) STAILQ_NEXT(req, fifo)) { + /* Save relative offset of request start */ + req->start_offset = batched_bytes; + req->end_offset = -1; - while (req) { - if (wal_opt_rotate(wal, r, &writer->vclock) != 0) - break; - struct wal_request *batch_end; - batch_end = wal_fill_batch(*wal, batch, writer->rows_per_wal, - req); - if (batch_end == req) - break; - write_end = wal_write_batch(*wal, batch, req, batch_end, - &writer->vclock); - if (batch_end != write_end) + /* + * Iterate over request rows (tx statements) + */ + struct xrow_header **row = req->rows; + for (; row < req->rows + req->n_rows; row++) { + /* Check batch has enough space to fit statement */ + if (unlikely(fio_batch_unused(batch) < XROW_IOVMAX)) { + /* + * No space in the batch for this statement, + * flush added statements and rotate batch. + */ + assert(fio_batch_size(batch) > 0); + ssize_t nwr = wal_fio_batch_write(batch, + fileno(wal->f)); + if (nwr < 0) + goto done; /* to break outer loop */ + + /* Update cached file offset */ + written_bytes += nwr; + } + + /* Add the statement to iov batch */ + struct iovec *iov = fio_batch_book(batch, XROW_IOVMAX); + assert(iov != NULL); /* checked above */ + int iovcnt = xlog_encode_row(*row, iov); + batched_bytes += fio_batch_add(batch, iovcnt); + } + + /* Save relative offset of request end */ + req->end_offset = batched_bytes; + } + /* Flush remaining data in batch (if any) */ + if (fio_batch_size(batch) > 0) { + ssize_t nwr = wal_fio_batch_write(batch, fileno(wal->f)); + if (nwr >= 0) { + /* Update cached file offset */ + written_bytes += nwr; + } + } + +done: + /* + * Iterate over `input` queue and add all processed requests to + * `commit` queue and all other to `rollback` queue. + */ + struct wal_request *reqend = req; + for (req = (struct wal_request *) STAILQ_FIRST(input); + req != reqend; + req = (struct wal_request *) STAILQ_NEXT(req, fifo)) { + /* + * Check if request has been fully written to xlog. + */ + if (unlikely(req->end_offset == -1 || + req->end_offset > written_bytes)) { + /* + * This and all subsequent requests have been failed + * to write. Truncate xlog to the end of last + * successfully written request. + */ + + /* Calculate relative position of the good request */ + off_t garbage_bytes = written_bytes - req->start_offset; + assert(garbage_bytes >= 0); + + /* Get absolute position */ + off_t good_offset = fio_lseek(fileno(wal->f), + -garbage_bytes, SEEK_CUR); + if (good_offset < 0) + panic_syserror("failed to get xlog position"); + + /* Truncate xlog */ + if (ftruncate(fileno(wal->f), good_offset) != 0) + panic_syserror("failed to rollback xlog"); + written_bytes = req->start_offset; + + /* Move tail to `rollback` queue. */ + STAILQ_SPLICE(input, req, fifo, rollback); break; - req = write_end; + } + + /* Update internal vclock */ + vclock_follow(&writer->vclock, + req->rows[req->n_rows - 1]->server_id, + req->rows[req->n_rows - 1]->lsn); + /* Update row counter for wal_opt_rotate() */ + wal->rows += req->n_rows; + /* Mark request as successed for tx thread */ + req->res = 0; } + fiber_gc(); - STAILQ_SPLICE(input, write_end, fifo, rollback); + /* Move all processed requests to `commit` queue */ STAILQ_CONCAT(commit, input); + return; } /** WAL writer thread main loop. */ diff --git a/src/box/wal.h b/src/box/wal.h index c0272ac65cf91bf4cd4dda0c1c9919c46ba7b14b..9afa286fb8e4721bb645cb8ad80d99aa2002183a 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -46,6 +46,10 @@ struct wal_request: public cmsg { /* Auxiliary. */ int64_t res; struct fiber *fiber; + /* Relative position of the start of request (used for rollback) */ + off_t start_offset; + /* Relative position of the end of request (used for rollback) */ + off_t end_offset; int n_rows; struct xrow_header *rows[]; }; diff --git a/src/fio.c b/src/fio.c index f332c4aac1b57283b44b8086a1e1903dbb7938ff..8561d625a434a2ef217f4853da77fe37f198c0c2 100644 --- a/src/fio.c +++ b/src/fio.c @@ -161,99 +161,83 @@ fio_truncate(int fd, off_t offset) return rc; } + struct fio_batch * -fio_batch_alloc(int max_iov) +fio_batch_new(void) { + int max_iov = sysconf(_SC_IOV_MAX); + struct fio_batch *batch = (struct fio_batch *) malloc(sizeof(struct fio_batch) + - sizeof(struct iovec) * max_iov + - (max_iov / CHAR_BIT + 1)); + sizeof(struct iovec) * max_iov); if (batch == NULL) return NULL; - batch->bytes = batch->rows = batch->iovcnt = batch->max_rows = 0; + + fio_batch_reset(batch); batch->max_iov = max_iov; - batch->rowflag = (char *) (batch + 1) + sizeof(struct iovec) * max_iov; return batch; } void -fio_batch_start(struct fio_batch *batch, long max_rows) +fio_batch_delete(struct fio_batch *batch) { - batch->bytes = batch->rows = batch->iovcnt = 0; - batch->max_rows = max_rows; - memset(batch->rowflag, 0, batch->max_iov / CHAR_BIT + 1); + free(batch); } -void -fio_batch_add(struct fio_batch *batch, int iovcnt) +size_t +fio_batch_add(struct fio_batch *batch, int count) { - assert(iovcnt > 0); - assert(batch->max_rows > 0); - int i = batch->iovcnt; - batch->iovcnt += iovcnt; - for (; i < batch->iovcnt; i++) - batch->bytes += batch->iov[i].iov_len; - bit_set(batch->rowflag, batch->iovcnt); - batch->rows++; + assert(batch->iovcnt + count <= batch->max_iov); + + size_t total_bytes = 0; + struct iovec *iov = batch->iov + batch->iovcnt; + struct iovec *end = iov + count; + for (; iov != end; ++iov) { + assert(iov->iov_base != NULL && iov->iov_len > 0); + total_bytes += iov->iov_len; + } + batch->iovcnt += count; + batch->bytes += total_bytes; + return total_bytes; } -int -fio_batch_write(struct fio_batch *batch, int fd) +/** + * Rotate batch after partial write. + */ +static inline void +fio_batch_rotate(struct fio_batch *batch, size_t bytes_written) { - ssize_t bytes_written = fio_writev(fd, batch->iov, batch->iovcnt); - if (bytes_written <= 0) - return 0; + /* + * writev(2) usually fully write all data on local filesystems. + */ + if (likely(bytes_written == batch->bytes)) { + /* Full write */ + fio_batch_reset(batch); + return; + } - if (bytes_written == batch->bytes) - return batch->rows; /* returns the number of written rows */ + assert(bytes_written < batch->bytes); /* Partial write */ + batch->bytes -= bytes_written; - say_warn("fio_batch_write, [%s]: partial write," - " wrote %jd out of %jd bytes", - fio_filename(fd), - (intmax_t) bytes_written, (intmax_t) batch->bytes); + struct iovec *iov = batch->iov; + struct iovec *iovend = iov + batch->iovcnt; + for (; bytes_written >= iov->iov_len; ++iov) + bytes_written -= iov->iov_len; + + assert(iov < iovend); /* Partial write */ + iov->iov_base = (char *) iov->iov_base + bytes_written; + iov->iov_len -= bytes_written; + memmove(batch->iov, iov, iovend - iov); + batch->iovcnt = iovend - iov; +} - /* Iterate over end of row flags */ - struct bit_iterator bit_it; - bit_iterator_init(&bit_it, batch->rowflag, - batch->max_iov / CHAR_BIT + 1, 1); - size_t row_last_iov = bit_iterator_next(&bit_it); +ssize_t +fio_batch_write(struct fio_batch *batch, int fd) +{ + ssize_t bytes_written = fio_writev(fd, batch->iov, batch->iovcnt); + if (unlikely(bytes_written <= 0)) + return -1; /* Error */ - int good_rows = 0; /* the number of fully written rows */ - ssize_t good_bytes = 0; /* the number of bytes in fully written rows */ - ssize_t row_bytes = 0; /* the number of bytes in the current row */ - struct iovec *iov = batch->iov; - while (iov < batch->iov + batch->iovcnt) { - if (good_bytes + row_bytes + iov->iov_len > bytes_written) - break; - row_bytes += iov->iov_len; - if ((iov - batch->iov) == row_last_iov) { - /* the end of current row */ - good_bytes += row_bytes; - row_bytes = 0; - good_rows++; - row_last_iov = bit_iterator_next(&bit_it); - } - iov++; - } - /* - * Unwind file position back to ensure we do not leave - * partially written rows. - */ - off_t good_offset = fio_lseek(fd, - good_bytes - bytes_written, SEEK_CUR); - /* - * The caller may choose to close the file right after - * a partial write. Don't take chances and make sure that - * there is no garbage at the end of file if it happens. - */ - if (good_offset != -1) - (void) fio_truncate(fd, good_offset); - /* - * writev() doesn't set errno in case of a partial write. - * If nothing else from the above failed, set errno to - * EAGAIN. - */ - if (! errno) - errno = EAGAIN; - return good_rows; /* returns the number of written rows */ + fio_batch_rotate(batch, bytes_written); + return bytes_written; } diff --git a/src/fio.h b/src/fio.h index 2fc90da2f8e98960254a8525eb4a31ec3ce6e6df..9b8e11640340ae088fdaeeec051f48bd45a1fa9b 100644 --- a/src/fio.h +++ b/src/fio.h @@ -35,9 +35,11 @@ * the requested number of bytes), log errors nicely, provide batch * writes. */ +#include <stddef.h> #include <sys/types.h> #include <stdbool.h> #include <sys/uio.h> +#include <assert.h> #if defined(__cplusplus) extern "C" { @@ -157,59 +159,63 @@ struct fio_batch /** Total number of bytes in batched rows. */ ssize_t bytes; /** Total number of batched rows.*/ - int rows; - /** Total number of I/O vectors */ int iovcnt; /** A cap on how many rows can be batched. Can be set to INT_MAX. */ - int max_rows; - /** A system cap on how many rows can be batched. */ int max_iov; - /** - * End of row flags for each iov (bitset). fio_write() tries to - * write {iov, iov, iov with flag} blocks atomically. - */ - char *rowflag; /* Batched rows. */ struct iovec iov[]; }; struct fio_batch * -fio_batch_alloc(int max_iov); +fio_batch_new(void); -/** Begin a new batch write. Set a cap on the number of rows in the batch. */ void -fio_batch_start(struct fio_batch *batch, long max_rows); +fio_batch_delete(struct fio_batch *batch); + +static inline void +fio_batch_reset(struct fio_batch *batch) +{ + batch->bytes = 0; + batch->iovcnt = 0; +} + +static inline size_t +fio_batch_size(struct fio_batch *batch) +{ + return batch->bytes; +} + +static inline int +fio_batch_unused(struct fio_batch *batch) +{ + return batch->max_iov - batch->iovcnt; +} /** * Add a row to a batch. * @pre iovcnt is the number of iov elements previously * booked with fio_batch_book() and filled with data - * @pre fio_batch_is_full() == false */ -void -fio_batch_add(struct fio_batch *batch, int iovcnt); +size_t +fio_batch_add(struct fio_batch *batch, int count); /** - * Get a pointer to struct iov * in the batch - * beyond batch->iovcnt + offset. Ensure - * the iov has at least 'count' elements. + * Ensure the iov has at least 'count' elements. */ static inline struct iovec * -fio_batch_book(struct fio_batch *batch, int offset, int count) +fio_batch_book(struct fio_batch *batch, int count) { - if (batch->iovcnt + offset + count <= batch->max_iov) - return batch->iov + batch->iovcnt + offset; - return 0; + if (batch->iovcnt + count <= batch->max_iov) + return batch->iov + batch->iovcnt; + return NULL; } /** - * Write all rows stacked into the batch. - * In case of error, seeks back to the end of - * the last fully written row. - * - * @return The number of rows written. + * Write batch to fd using writev(2) and rotate batch. + * In case of partial write batch will contain remaining data. + * \sa fio_writev() */ -int +ssize_t fio_batch_write(struct fio_batch *batch, int fd); #if defined(__cplusplus) diff --git a/test/box/transaction.result b/test/box/transaction.result index 4291368149314c6e2966a398adcffd35063f31a5..e80b2c7e3a3e72da2df03e640e1840b45285077c 100644 --- a/test/box/transaction.result +++ b/test/box/transaction.result @@ -365,6 +365,33 @@ message:match('does not exist') --- - does not exist ... +if not status then box.rollback() end +--- +... test = nil --- ... +--# setopt delimiter ';' +function tx_limit(n) + box.begin() + for i=0,n do + box.space.test:insert{i} + end + box.commit() +end; +--- +... +--# setopt delimiter '' +_ = box.schema.space.create('test'); +--- +... +_ = box.space.test:create_index('primary'); +--- +... +tx_limit(10000) +--- +... +box.space.test:len() +--- +- 10001 +... diff --git a/test/box/transaction.test.lua b/test/box/transaction.test.lua index 23ebed4f8da857e3b49e27c3fb8991cf6449a21e..bb02ccf48d3b5116e5bf2770dca65ff1bece74b8 100644 --- a/test/box/transaction.test.lua +++ b/test/box/transaction.test.lua @@ -170,4 +170,19 @@ box.space.test:drop() status, message = pcall(function() box.begin() test:put{1} test:put{2} box.commit() end) status message:match('does not exist') +if not status then box.rollback() end test = nil + +--# setopt delimiter ';' +function tx_limit(n) + box.begin() + for i=0,n do + box.space.test:insert{i} + end + box.commit() +end; +--# setopt delimiter '' +_ = box.schema.space.create('test'); +_ = box.space.test:create_index('primary'); +tx_limit(10000) +box.space.test:len()