diff --git a/.gitignore b/.gitignore index 18357cfe4ad81aaf61de8993f9a5a6e71a14d3e0..39857d920e7e5df06d8b693caea454346ca32e13 100644 --- a/.gitignore +++ b/.gitignore @@ -62,6 +62,7 @@ test/connector_c/xlog test/lib/*.pyc test/lib/*/*.pyc test/unit/*.test +test/unit/fiob test/var third_party/luajit/src/luajit third_party/luajit/lib/vmdef.lua diff --git a/include/box/box.h b/include/box/box.h index 2070dd9bca43080584f26d524c31a38c00e94e35..f1ca11fa185da694ec8c7efc5421813d7c4ab9b2 100644 --- a/include/box/box.h +++ b/include/box/box.h @@ -83,7 +83,7 @@ void box_lua_load_cfg(struct lua_State *L); * Iterate over all spaces and save them to the * snapshot file. */ -void box_snapshot(struct log_io *, struct fio_batch *batch); +void box_snapshot(struct log_io *); /** * Spit out some basic module status (master/slave, etc. */ diff --git a/include/recovery.h b/include/recovery.h index 8228b4d104096dda93a9f649a9c8e5e0a0fc40c2..8978472d65478769895d65a7bb808af84ffbafd5 100644 --- a/include/recovery.h +++ b/include/recovery.h @@ -145,11 +145,10 @@ void recovery_stop_remote(struct recovery_state *r); struct fio_batch; -void snapshot_write_row(struct log_io *i, struct fio_batch *batch, +void snapshot_write_row(struct log_io *i, const char *metadata, size_t metadata_size, const char *data, size_t data_size); -void snapshot_save(struct recovery_state *r, - void (*loop) (struct log_io *, struct fio_batch *)); +void snapshot_save(struct recovery_state *r, void (*loop) (struct log_io *)); #if defined(__cplusplus) } /* extern "C" */ diff --git a/src/box/box.cc b/src/box/box.cc index fd0ad87c8e4e2c186f8ddc08d54dbc99f8c9204f..49ef56113566d5e598b71f07f346efcd9230cb36 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -355,7 +355,7 @@ box_init(bool init_storage) } static void -snapshot_write_tuple(struct log_io *l, struct fio_batch *batch, +snapshot_write_tuple(struct log_io *l, uint32_t n, struct tuple *tuple) { struct box_snap_row header; @@ -363,35 +363,28 @@ snapshot_write_tuple(struct log_io *l, struct fio_batch *batch, header.tuple_size = tuple->field_count; header.data_size = tuple->bsize; - snapshot_write_row(l, batch, (const char *) &header, sizeof(header), + snapshot_write_row(l, (const char *) &header, sizeof(header), tuple->data, tuple->bsize); } -struct snapshot_space_param { - struct log_io *l; - struct fio_batch *batch; -}; - static void snapshot_space(struct space *sp, void *udata) { struct tuple *tuple; - struct snapshot_space_param *ud = (struct snapshot_space_param *) udata; + struct log_io *l = (struct log_io *)udata; Index *pk = space_index(sp, 0); struct iterator *it = pk->position();; pk->initIterator(it, ITER_ALL, NULL, 0); while ((tuple = it->next(it))) - snapshot_write_tuple(ud->l, ud->batch, space_n(sp), tuple); + snapshot_write_tuple(l, space_n(sp), tuple); } void -box_snapshot(struct log_io *l, struct fio_batch *batch) +box_snapshot(struct log_io *l) { - struct snapshot_space_param ud = { l, batch }; - - space_foreach(snapshot_space, &ud); + space_foreach(snapshot_space, l); } void diff --git a/src/fiob.c b/src/fiob.c index 1ab63c106b5a25d7503e8ac2af75be47404ee7e2..09cdc7757e4c10d5f9135c9306a020aa0266c2c6 100644 --- a/src/fiob.c +++ b/src/fiob.c @@ -121,9 +121,13 @@ fiob_flushb(struct fiob *f) return -1; assert(cur + f->bfill >= size); + if (fiob_writef(f, f->buf, f->bsize) < 0) return -1; - lseek(f->fd, cur + f->bfill, SEEK_SET); + + if (lseek(f->fd, cur + f->bfill, SEEK_SET) == (off_t)-1) + return -1; + int res = ftruncate(f->fd, cur + f->bfill); f->bfill = 0; return res; @@ -149,44 +153,41 @@ fiob_write(void *cookie, const char *buf, size_t len) return len; } - /* data is longer than buffer */ - if (f->bfill < f->bsize) { - memcpy(f->buf + f->bfill, buf, - len - (f->bsize - f->bfill)); - wrdone = fiob_writef(f, f->buf, f->bsize); + /* buffer is full */ + if (f->bfill >= f->bsize) { + wrdone = fiob_writef(f, f->buf, f->bsize); if (wrdone < 0) return wrdone; - if (wrdone < f->bsize) { - if (wrdone <= f->bfill) { - f->bfill -= wrdone; - memcpy(f->buf, - f->buf + wrdone, - f->bfill - wrdone); - return fiob_write(cookie, buf, len); - } - wrdone -= f->bfill; - } - wrdone -= f->bfill; f->bfill = 0; - buf += wrdone; - len -= wrdone; - - if (len > 0) { - wrdone += fiob_write(cookie, buf, len); - } - return wrdone; + return fiob_write(cookie, buf, len); } + /* data is longer than buffer */ + memcpy(f->buf + f->bfill, buf, f->bsize - f->bfill); + wrdone = fiob_writef(f, f->buf, f->bsize); + + + if (wrdone < 0) return wrdone; - if (wrdone < f->bsize) { - f->bfill = f->bsize - wrdone; - memcpy(f->buf, f->buf + wrdone, f->bfill); + + wrdone -= f->bfill; + + f->bfill = 0; + buf += wrdone; + len -= wrdone; + + + if (len > 0) { + ssize_t wrtail = fiob_write(cookie, buf, len); + if (wrtail < 0) + return wrtail; + wrdone += wrtail; } - return fiob_write(cookie, buf, len); + return wrdone; } return fiob_writef(f, buf, len); diff --git a/src/log_io.cc b/src/log_io.cc index 01a5e688096cbeabcca2ec369444e5c26503e63e..977a24bb1c36c05aa2310a90598f14d1950c9713 100644 --- a/src/log_io.cc +++ b/src/log_io.cc @@ -645,7 +645,11 @@ log_io_open_for_write(struct log_dir *dir, int64_t lsn, enum log_suffix suffix) * Open the <lsn>.<suffix>.inprogress file. If it exists, * open will fail. */ - f = fiob_open_flags(filename, dir->open_wflags, "wxd"); + if (dir == &snap_dir) { + f = fiob_open_flags(filename, dir->open_wflags, "wxd"); + } else { + f = fiob_open_flags(filename, dir->open_wflags, "wx"); + } if (!f) goto error; diff --git a/src/recovery.cc b/src/recovery.cc index 3b20a649dd6722e91f1ceba636a36c8384429135..255526d89000d18a2361cfa6e78d2c53af632162 100644 --- a/src/recovery.cc +++ b/src/recovery.cc @@ -1146,23 +1146,11 @@ wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie, /* {{{ SAVE SNAPSHOT and tarantool_box --cat */ -static void -snap_write_batch(struct fio_batch *batch, FILE *f) -{ - int rows_written = fio_batch_writef(batch, f); - if (rows_written != batch->rows) { - say_error("partial write: %d out of %d rows", - rows_written, batch->rows); - panic_syserror("fio_batch_write"); - } -} - void -snapshot_write_row(struct log_io *l, struct fio_batch *batch, +snapshot_write_row(struct log_io *l, const char *metadata, size_t metadata_len, const char *data, size_t data_len) { - static int rows; static uint64_t bytes; ev_tstamp elapsed; static ev_tstamp last = 0; @@ -1175,70 +1163,65 @@ snapshot_write_row(struct log_io *l, struct fio_batch *batch, metadata, metadata_len, data, data_len); header_v11_sign(&row->header); - fio_batch_add(batch, row, row_v11_size(row)); - bytes += row_v11_size(row); - if (++rows % 100000 == 0) - say_crit("%.1fM rows written", rows / 1000000.); + size_t written = fwrite(row, 1, row_v11_size(row), l->f); - if (fio_batch_is_full(batch) || - bytes > recovery_state->snap_io_rate_limit) { + if (written != row_v11_size(row)) { + say_error("Can't write row (%zu bytes)", row_v11_size(row)); + panic_syserror("snapshot_write_row"); + } - snap_write_batch(batch, l->f); - fio_batch_start(batch, INT_MAX); - prelease_after(fiber->gc_pool, 128 * 1024); - if (recovery_state->snap_io_rate_limit != UINT64_MAX) { - if (last == 0) { - /* - * Remember the time of first - * write to disk. - */ - ev_now_update(); - last = ev_now(); - } - /** - * If io rate limit is set, flush the - * filesystem cache, otherwise the limit is - * not really enforced. - */ - if (bytes > recovery_state->snap_io_rate_limit) - fdatasync(fileno(l->f)); - } - while (bytes >= recovery_state->snap_io_rate_limit) { - ev_now_update(); - /* - * How much time have passed since - * last write? - */ - elapsed = ev_now() - last; + bytes += written; + + + prelease_after(fiber->gc_pool, 128 * 1024); + + if (recovery_state->snap_io_rate_limit != UINT64_MAX) { + if (last == 0) { /* - * If last write was in less than - * a second, sleep until the - * second is reached. + * Remember the time of first + * write to disk. */ - if (elapsed < 1) - usleep(((1 - elapsed) * 1000000)); - ev_now_update(); last = ev_now(); - bytes -= recovery_state->snap_io_rate_limit; } + /** + * If io rate limit is set, flush the + * filesystem cache, otherwise the limit is + * not really enforced. + */ + if (bytes > recovery_state->snap_io_rate_limit) + fdatasync(fileno(l->f)); + } + while (bytes >= recovery_state->snap_io_rate_limit) { + ev_now_update(); + /* + * How much time have passed since + * last write? + */ + elapsed = ev_now() - last; + /* + * If last write was in less than + * a second, sleep until the + * second is reached. + */ + if (elapsed < 1) + usleep(((1 - elapsed) * 1000000)); + + ev_now_update(); + last = ev_now(); + bytes -= recovery_state->snap_io_rate_limit; } } void -snapshot_save(struct recovery_state *r, - void (*f) (struct log_io *, struct fio_batch *)) +snapshot_save(struct recovery_state *r, void (*f) (struct log_io *)) { struct log_io *snap; snap = log_io_open_for_write(r->snap_dir, r->confirmed_lsn, INPROGRESS); if (snap == NULL) panic_status(errno, "Failed to save snapshot: failed to open file in write mode."); - struct fio_batch *batch = fio_batch_alloc(sysconf(_SC_IOV_MAX)); - if (batch == NULL) - panic_syserror("fio_batch_alloc"); - fio_batch_start(batch, INT_MAX); /* * While saving a snapshot, snapshot name is set to * <lsn>.snap.inprogress. When done, the snapshot is @@ -1248,12 +1231,8 @@ snapshot_save(struct recovery_state *r, format_filename(r->snap_dir, r->confirmed_lsn, NONE)); if (f) - f(snap, batch); - - if (batch->rows) - snap_write_batch(batch, snap->f); + f(snap); - free(batch); log_io_close(&snap); say_info("done"); diff --git a/test/unit/fiob.c b/test/unit/fiob.c index c487feb4d5d53770e6e63a60a7aaed965617d36a..2969d0c0ba4758b78c593f355904760aa9837775 100644 --- a/test/unit/fiob.c +++ b/test/unit/fiob.c @@ -22,7 +22,7 @@ -#define PLAN 29 +#define PLAN 47 #define ITEMS 7 @@ -142,6 +142,69 @@ main(void) is(fclose(f), 0, "fclose"); } + { + FILE *f = fiob_open(catfile(td, "tm"), "wxd"); + isnt(f, NULL, "open big file"); + size_t done = fwrite("Hello, world\n", 1, 13, f); + is(done, 13, "Hello world is written (%zu bytes)", done); + + size_t i; + + for (i = 0; i < 1000000; i++) { + done += fwrite("Hello, world\n", 1, 13, f); + } + is(done, 13 + 13 * 1000000, "all bytes were written"); + is(fclose(f), 0, "fclose"); + + f = fopen(catfile(td, "tm"), "r"); + isnt(f, NULL, "reopen file for reading"); + done = 0; + for (i = 0; i < 1000000 + 1; i++) { + buf[0] = 0; + fgets(buf, 4096, f); + if (strcmp(buf, "Hello, world\n") == 0) + done++; + } + is(done, 1000000 + 1, "all records were written properly"); + + is(fgets(buf, 4096, f), NULL, "eof"); + isnt(feof(f), 0, "feof"); + is(fclose(f), 0, "fclose"); + } + { + FILE *f = fiob_open(catfile(td, "tm"), "w+d"); + setvbuf(f, NULL, _IONBF, 0); + isnt(f, NULL, "open big file"); + size_t done = fwrite("Hello, world\n", 1, 13, f); + is(done, 13, "Hello world is written (%zu bytes)", done); + + size_t i; + + for (i = 0; i < 1000000; i++) { + done += fwrite("Hello, world\n", 1, 13, f); + } + is(done, 13 + 13 * 1000000, "all bytes were written"); + is(fclose(f), 0, "fclose"); + + f = fopen(catfile(td, "tm"), "r"); + isnt(f, NULL, "reopen file for reading"); + done = 0; + for (i = 0; i < 1000000 + 1; i++) { + memset(buf, 0, 4096); + fgets(buf, 4096, f); + if (strcmp(buf, "Hello, world\n") == 0) + done++; + else + fprintf(stderr, "# wrong line %zu: %s", + i, buf); + } + is(done, 1000000 + 1, "all records were written properly"); + + is(fgets(buf, 4096, f), NULL, "eof"); + isnt(feof(f), 0, "feof"); + is(fclose(f), 0, "fclose"); + } + if (fork() == 0) diff --git a/test/unit/fiob.result b/test/unit/fiob.result index 7b4e154541774825d820274ec7cfd154f68405d7..2cedf473e2d4c16595a053e2d2d5699b28e9e91e 100644 --- a/test/unit/fiob.result +++ b/test/unit/fiob.result @@ -1,31 +1,48 @@ -1..30 +1..47 ok 1 - tempdir is created -ok 2 - malloc -ok 3 - common open -ok 4 - Hello world is written (12 bytes) -ok 5 - current position -ok 6 - set new position -ok 7 - current position 0 -ok 8 - Hello world is read (12 bytes) -ok 9 - data -ok 10 - set new position -ok 11 - Hello world is read (12 bytes) -ok 12 - data +ok 2 - common open +ok 3 - Hello world is written (12 bytes) +ok 4 - current position +ok 5 - set new position +ok 6 - current position 0 +ok 7 - Hello world is read (12 bytes) +ok 8 - data +ok 9 - set new position +ok 10 - Hello world is read (12 bytes) +ok 11 - data +ok 12 - set new position ok 13 - set new position -ok 14 - set new position +ok 14 - data is read ok 15 - data is read -ok 16 - data is read -ok 17 - fclose -ok 18 - reopened file -ok 19 - move pos at finish -ok 20 - file size -ok 21 - fclose -ok 22 - common open: O_EXCL -ok 23 - common open -ok 24 - Hello world is written (12 bytes) -ok 25 - move pos -ok 26 - Hello world is written (12 bytes) -ok 27 - move pos -ok 28 - read 11 bytes -ok 29 - content was read -ok 30 - fclose +ok 16 - fclose +ok 17 - reopened file +ok 18 - move pos at finish +ok 19 - file size +ok 20 - fclose +ok 21 - common open: O_EXCL +ok 22 - common open +ok 23 - Hello world is written (12 bytes) +ok 24 - move pos +ok 25 - Hello world is written (12 bytes) +ok 26 - move pos +ok 27 - read 11 bytes +ok 28 - content was read +ok 29 - fclose +ok 30 - open big file +ok 31 - Hello world is written (13 bytes) +ok 32 - all bytes were written +ok 33 - fclose +ok 34 - reopen file for reading +ok 35 - all records were written properly +ok 36 - eof +ok 37 - feof +ok 38 - fclose +ok 39 - open big file +ok 40 - Hello world is written (13 bytes) +ok 41 - all bytes were written +ok 42 - fclose +ok 43 - reopen file for reading +ok 44 - all records were written properly +ok 45 - eof +ok 46 - feof +ok 47 - fclose