diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 6600e8b4eb0aec10c3be42bc7dcda80d6985eb76..6ed33f81a5e2716e585e156f597798e01f4917e7 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -116,7 +116,7 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; /* {{{ LSN API */ -static void +void fill_lsn(struct recovery_state *r, struct xrow_header *row) { if (row->server_id == 0) { @@ -551,16 +551,8 @@ recovery_stop_local(struct recovery_state *r) * in the data state. */ -struct wal_write_request { - STAILQ_ENTRY(wal_write_request) wal_fifo_entry; - /* Auxiliary. */ - int64_t res; - struct fiber *fiber; - struct xrow_header *row; -}; - /* Context of the WAL writer thread. */ -STAILQ_HEAD(wal_fifo, wal_write_request); +STAILQ_HEAD(wal_fifo, wal_request); struct wal_writer { @@ -626,7 +618,7 @@ wal_schedule_queue(struct wal_fifo *queue) * Can't use STAILQ_FOREACH since fiber_call() * destroys the list entry. */ - struct wal_write_request *req, *tmp; + struct wal_request *req, *tmp; STAILQ_FOREACH_SAFE(req, queue, wal_fifo_entry, tmp) fiber_call(req->fiber); } @@ -654,7 +646,7 @@ wal_schedule(ev_loop * /* loop */, ev_async *watcher, int /* event */) * in reverse order, performing a playback of the * in-memory database state. */ - STAILQ_REVERSE(&rollback, wal_write_request, wal_fifo_entry); + STAILQ_REVERSE(&rollback, wal_request, wal_fifo_entry); wal_schedule_queue(&rollback); } @@ -841,19 +833,33 @@ wal_opt_rotate(struct xlog **wal, struct recovery_state *r, return l ? 0 : -1; } -static struct wal_write_request * +static struct wal_request * wal_fill_batch(struct xlog *wal, struct fio_batch *batch, int rows_per_wal, - struct wal_write_request *req) + struct wal_request *req) { int max_rows = rows_per_wal - wal->rows; /* Post-condition of successful wal_opt_rotate(). */ assert(max_rows > 0); fio_batch_start(batch, max_rows); - struct iovec iov[XROW_IOVMAX]; - while (req != NULL && !fio_batch_has_space(batch, nelem(iov))) { - int iovcnt = xlog_encode_row(req->row, iov); - fio_batch_add(batch, iov, iovcnt); + while (req != NULL && batch->rows < batch->max_rows) { + int iovcnt = 0; + struct iovec *iov; + struct xrow_header **row = req->rows; + for (; row < req->rows + req->n_rows; row++) { + iov = fio_batch_book(batch, iovcnt, XROW_IOVMAX); + if (iov == NULL) { + /* + * No space in the batch for + * this transaction, open a new + * batch for it and hope that it + * is sufficient to hold it. + */ + return req; + } + iovcnt += xlog_encode_row(*row, iov); + } + fio_batch_add(batch, iovcnt); req = STAILQ_NEXT(req, wal_fifo_entry); } return req; @@ -870,15 +876,17 @@ wal_fio_batch_write(struct fio_batch *batch, int fd) return fio_batch_write(batch, fd); } -static struct wal_write_request * +static struct wal_request * wal_write_batch(struct xlog *wal, struct fio_batch *batch, - struct wal_write_request *req, struct wal_write_request *end, + struct wal_request *req, struct wal_request *end, struct vclock *vclock) { int rows_written = wal_fio_batch_write(batch, fileno(wal->f)); wal->rows += rows_written; while (req != end && rows_written-- != 0) { - vclock_follow(vclock, req->row->server_id, req->row->lsn); + vclock_follow(vclock, + req->rows[req->n_rows - 1]->server_id, + req->rows[req->n_rows - 1]->lsn); req->res = 0; req = STAILQ_NEXT(req, wal_fifo_entry); } @@ -893,15 +901,17 @@ wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer, struct xlog **wal = &r->current_wal; struct fio_batch *batch = writer->batch; - struct wal_write_request *req = STAILQ_FIRST(input); - struct wal_write_request *write_end = req; + struct wal_request *req = STAILQ_FIRST(input); + struct wal_request *write_end = req; while (req) { if (wal_opt_rotate(wal, r, &writer->vclock) != 0) break; - struct wal_write_request *batch_end; + struct wal_request *batch_end; batch_end = wal_fill_batch(*wal, batch, writer->rows_per_wal, req); + if (batch_end == req) + break; write_end = wal_write_batch(*wal, batch, req, batch_end, &writer->vclock); if (batch_end != write_end) @@ -958,13 +968,8 @@ wal_writer_thread(void *worker_args) * to be written to disk and wait until this task is completed. */ int64_t -wal_write(struct recovery_state *r, struct xrow_header *row) +wal_write(struct recovery_state *r, struct wal_request *req) { - /* - * Bump current LSN even if wal_mode = NONE, so that - * snapshots still works with WAL turned off. - */ - fill_lsn(r, row); if (r->wal_mode == WAL_NONE) return vclock_sum(&r->vclock); @@ -972,14 +977,8 @@ wal_write(struct recovery_state *r, struct xrow_header *row) struct wal_writer *writer = r->writer; - struct wal_write_request *req = (struct wal_write_request *) - region_alloc(&fiber()->gc, sizeof(struct wal_write_request)); - req->fiber = fiber(); req->res = -1; - req->row = row; - row->tm = ev_now(loop()); - row->sync = 0; (void) tt_pthread_mutex_lock(&writer->mutex); diff --git a/src/box/recovery.h b/src/box/recovery.h index 9aae1746b39b3d230102869f77f8fdc222936587..8b1b654027475d92a372917c71c4dbd8cafdccb4 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -34,6 +34,7 @@ #include "trivia/util.h" #include "third_party/tarantool_ev.h" +#include "third_party/queue.h" #include "xlog.h" #include "vclock.h" #include "tt_uuid.h" @@ -140,7 +141,20 @@ void recovery_stop_local(struct recovery_state *r); void recovery_finalize(struct recovery_state *r, enum wal_mode mode, int rows_per_wal); -int64_t wal_write(struct recovery_state *r, struct xrow_header *packet); +void +fill_lsn(struct recovery_state *r, struct xrow_header *row); + +struct wal_request { + STAILQ_ENTRY(wal_request) wal_fifo_entry; + /* Auxiliary. */ + int64_t res; + struct fiber *fiber; + int n_rows; + struct xrow_header *rows[]; +}; + +int64_t +wal_write(struct recovery_state *r, struct wal_request *req); void recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_error); void recovery_apply_row(struct recovery_state *r, struct xrow_header *packet); diff --git a/src/box/txn.cc b/src/box/txn.cc index 4a87b2bc37272df405cddeea74802e86f21d36fb..f2947f4bb80693ed7f8552c1ed5f7ec959612c8b 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -54,6 +54,7 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request) stmt->row = request->header; if (request->header != NULL) return; + /* Create a redo log row for Lua requests */ struct xrow_header *row= (struct xrow_header *) region_alloc0(&fiber()->gc, sizeof(struct xrow_header)); @@ -144,17 +145,28 @@ txn_commit(struct txn *txn) if (txn->engine) txn->engine->prepare(txn); + struct wal_request *req = (struct wal_request *) + region_alloc(&fiber()->gc, sizeof(struct wal_request) + + sizeof(struct xrow_header) * txn->n_stmts); + req->n_rows = 0; + rlist_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; + /* + * Bump current LSN even if wal_mode = NONE, so that + * snapshots still works with WAL turned off. + */ + fill_lsn(recovery, stmt->row); + stmt->row->tm = ev_now(loop()); + req->rows[req->n_rows++] = stmt->row; + } + if (req->n_rows) { ev_tstamp start = ev_now(loop()), stop; - int64_t res = wal_write(recovery, stmt->row); + int64_t res = wal_write(recovery, req); stop = ev_now(loop()); - if (stop - start > too_long_threshold && stmt->row != NULL) { - say_warn("too long %s: %.3f sec", - iproto_type_name(stmt->row->type), - stop - start); - } + if (stop - start > too_long_threshold) + say_warn("too long WAL write: %.3f sec", stop - start); if (res < 0) tnt_raise(LoggedError, ER_WAL_IO); txn->signature = res; diff --git a/src/box/xlog.cc b/src/box/xlog.cc index 06eca9f56b20419e5783d537dbcce40d8abf3dcb..1157b56cf42296d9f2d5c0c362858685106adbe8 100644 --- a/src/box/xlog.cc +++ b/src/box/xlog.cc @@ -435,12 +435,11 @@ row_reader(FILE *f, struct xrow_header *row) int xlog_encode_row(const struct xrow_header *row, struct iovec *iov) { - int iovcnt = xrow_header_encode(row, iov + 1) + 1; - char *fixheader = (char *) region_alloc(&fiber()->gc, - XLOG_FIXHEADER_SIZE); - uint32_t len = 0; + int iovcnt = xrow_header_encode(row, iov, XLOG_FIXHEADER_SIZE); + char *fixheader = (char *) iov[0].iov_base; + uint32_t len = iov[0].iov_len - XLOG_FIXHEADER_SIZE; uint32_t crc32p = 0; - uint32_t crc32c = 0; + uint32_t crc32c = crc32_calc(0, fixheader + XLOG_FIXHEADER_SIZE, len); for (int i = 1; i < iovcnt; i++) { crc32c = crc32_calc(crc32c, (const char *) iov[i].iov_base, iov[i].iov_len); @@ -460,8 +459,6 @@ xlog_encode_row(const struct xrow_header *row, struct iovec *iov) if (padding > 0) data = mp_encode_strl(data, padding - 1) + padding - 1; assert(data == fixheader + XLOG_FIXHEADER_SIZE); - iov[0].iov_base = fixheader; - iov[0].iov_len = XLOG_FIXHEADER_SIZE; assert(iovcnt <= XROW_IOVMAX); return iovcnt; diff --git a/src/box/xrow.cc b/src/box/xrow.cc index cea768767d8d538ad871eefae5013d73fa62cded..be08cd8ea6ab2ba844a921b7056b6096add788d4 100644 --- a/src/box/xrow.cc +++ b/src/box/xrow.cc @@ -105,10 +105,13 @@ xrow_decode_uuid(const char **pos, struct tt_uuid *out) } int -xrow_header_encode(const struct xrow_header *header, struct iovec *out) +xrow_header_encode(const struct xrow_header *header, struct iovec *out, + size_t fixheader_len) { /* allocate memory for sign + header */ - char *data = (char *) region_alloc(&fiber()->gc, HEADER_LEN_MAX); + out->iov_base = region_alloc(&fiber()->gc, HEADER_LEN_MAX + + fixheader_len); + char *data = (char *) out->iov_base + fixheader_len; /* Header */ char *d = data + 1; /* Skip 1 byte for MP_MAP */ @@ -118,12 +121,13 @@ xrow_header_encode(const struct xrow_header *header, struct iovec *out) d = mp_encode_uint(d, header->type); map_size++; } - +#if 0 if (header->sync) { d = mp_encode_uint(d, IPROTO_SYNC); d = mp_encode_uint(d, header->sync); map_size++; } +#endif if (header->server_id) { d = mp_encode_uint(d, IPROTO_SERVER_ID); @@ -145,8 +149,7 @@ xrow_header_encode(const struct xrow_header *header, struct iovec *out) assert(d <= data + HEADER_LEN_MAX); mp_encode_map(data, map_size); - out->iov_base = data; - out->iov_len = (d - data); + out->iov_len = d - (char *) out->iov_base; out++; memcpy(out, header->body, sizeof(*out) * header->bodycnt); @@ -164,18 +167,15 @@ int xrow_to_iovec(const struct xrow_header *row, struct iovec *out) { static const int iov0_len = mp_sizeof_uint(UINT32_MAX); - int iovcnt = xrow_header_encode(row, out + 1) + 1; - char *fixheader = (char *) region_alloc(&fiber()->gc, iov0_len); - uint32_t len = 0; - for (int i = 1; i < iovcnt; i++) + int iovcnt = xrow_header_encode(row, out, iov0_len); + ssize_t len = -iov0_len; + for (int i = 0; i < iovcnt; i++) len += out[i].iov_len; /* Encode length */ - char *data = fixheader; + char *data = (char *) out[0].iov_base; *(data++) = 0xce; /* MP_UINT32 */ *(uint32_t *) data = mp_bswap_u32(len); - out[0].iov_base = fixheader; - out[0].iov_len = iov0_len; assert(iovcnt <= XROW_IOVMAX); return iovcnt; diff --git a/src/box/xrow.h b/src/box/xrow.h index 9cf1327779c3f39ba2964b68510cb2fe98d3ab0d..537fa335b9982163121bf47e2042daf3eb9cdc24 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -39,7 +39,7 @@ extern "C" { enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, - XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX + 1 + XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX }; struct xrow_header { @@ -66,7 +66,7 @@ xrow_encode_uuid(char *pos, const struct tt_uuid *in); int xrow_header_encode(const struct xrow_header *header, - struct iovec *out); + struct iovec *out, size_t fixheader_len); int xrow_to_iovec(const struct xrow_header *row, struct iovec *out); diff --git a/src/fio.c b/src/fio.c index 2a184161f356758fbe4ba1da9983455fce21c2fb..8c2eac65da4f287627303b554eb694e71c295eaa 100644 --- a/src/fio.c +++ b/src/fio.c @@ -183,15 +183,14 @@ fio_batch_start(struct fio_batch *batch, long max_rows) } void -fio_batch_add(struct fio_batch *batch, const struct iovec *iov, int iovcnt) +fio_batch_add(struct fio_batch *batch, int iovcnt) { - assert(!fio_batch_has_space(batch, iovcnt)); assert(iovcnt > 0); assert(batch->max_rows > 0); - for (int i = 0; i < iovcnt; i++) { - batch->iov[batch->iovcnt++] = iov[i]; - batch->bytes += iov[i].iov_len; - } + int i = batch->iovcnt; + batch->iovcnt += iovcnt; + for (; i < batch->iovcnt; i++) + batch->bytes += batch->iov[i].iov_len; bit_set(batch->rowflag, batch->iovcnt); batch->rows++; } diff --git a/src/fio.h b/src/fio.h index db7a59362c0118aca3b8cc3bfd6999a7ba662548..a00526af9d33f168f6c50d7633dd930e3f25dd1f 100644 --- a/src/fio.h +++ b/src/fio.h @@ -178,19 +178,27 @@ fio_batch_alloc(int max_iov); void fio_batch_start(struct fio_batch *batch, long max_rows); -static inline bool -fio_batch_has_space(struct fio_batch *batch, int iovcnt) -{ - return batch->iovcnt + iovcnt > batch->max_iov || - batch->rows >= batch->max_rows; -} - /** * Add a row to a batch. + * @pre iovcnt is the number of iov elements previously + * booked with fio_batch_book() and filled with data * @pre fio_batch_is_full() == false */ void -fio_batch_add(struct fio_batch *batch, const struct iovec *iov, int iovcnt); +fio_batch_add(struct fio_batch *batch, int iovcnt); + +/** + * Get a pointer to struct iov * in the batch + * beyond batch->iovcnt + offset. Ensure + * the iov has at least 'count' elements. + */ +static inline struct iovec * +fio_batch_book(struct fio_batch *batch, int offset, int count) +{ + if (batch->iovcnt + offset + count <= batch->max_iov) + return batch->iov + batch->iovcnt + offset; + return 0; +} /** * Write all rows stacked into the batch.