diff --git a/client/tarantar/indexate.c b/client/tarantar/indexate.c index f423895be30e590bda50cf6c76b6c1b3381d569b..0e4033057c01f2fd17b3e8bc248df90778ada20b 100644 --- a/client/tarantar/indexate.c +++ b/client/tarantar/indexate.c @@ -76,7 +76,7 @@ snapshot_process_row(struct ts_spaces *s, int fileid, int offset, mh_int_t pos = mh_pk_put(space->index, &node, NULL, space); if (pos == mh_end(space->index)) { - free(k); + ts_space_keyfree(space, k); return -1; } return 0; @@ -209,17 +209,18 @@ xlog_process_row(struct ts_spaces *s, int fileid, int offset, struct tnt_request case TNT_OP_INSERT: pos = mh_pk_put(space->index, &node, NULL, space); if (pos == mh_end(space->index)) { - free(k); + ts_space_keyfree(space, k); return -1; } + ts_oomcheck(); break; case TNT_OP_DELETE: { pos = mh_pk_get(space->index, &node, space); assert(pos != mh_end(space->index)); struct ts_key *key = *mh_pk_node(space->index, pos); mh_pk_del(space->index, pos, space); - free(key); - free(k); + ts_space_keyfree(space, key); + ts_space_keyfree(space, k); break; } case TNT_OP_UPDATE: { @@ -237,13 +238,13 @@ xlog_process_row(struct ts_spaces *s, int fileid, int offset, struct tnt_request old = tnt_tuple_set(NULL, key->key + space->key_size + sizeof(uint32_t), size); if (old == NULL) { - free(k); + ts_space_keyfree(space, k); return -1; } } else { /* load from file */ if (ts_cursor_open(&cursor, key)) { - free(k); + ts_space_keyfree(space, k); return -1; } old = ts_cursor_tuple(&cursor); @@ -252,8 +253,8 @@ xlog_process_row(struct ts_spaces *s, int fileid, int offset, struct tnt_request /* remove key tuple from index, due to possibility of the key * being changed by update */ mh_pk_del(space->index, pos, space); - free(k); - free(key); + ts_space_keyfree(space, k); + ts_space_keyfree(space, key); /* free old key */ struct tnt_tuple *n = ts_update(r, old); @@ -278,7 +279,7 @@ xlog_process_row(struct ts_spaces *s, int fileid, int offset, struct tnt_request node = k; pos = mh_pk_put(space->index, &node, NULL, space); if (pos == mh_end(space->index)) { - free(k); + ts_space_keyfree(space, k); return -1; } break; diff --git a/client/tarantar/main.c b/client/tarantar/main.c index efc5f8277a41923d85ee305d5024729af59a4973..c2a5c5bb83a009262bd305c9050ac285f97fab83 100644 --- a/client/tarantar/main.c +++ b/client/tarantar/main.c @@ -33,6 +33,9 @@ #include <stdint.h> #include <stdio.h> #include <string.h> +#include <time.h> +#include <unistd.h> +#include <malloc.h> #include <connector/c/include/tarantool/tnt.h> @@ -54,14 +57,21 @@ struct ts tss; -static int +static void ts_init(void) +{ + ts_options_init(&tss.opts); + memset(&tss.s, 0, sizeof(tss.s)); + memset(&tss.ra, 0, sizeof(tss.ra)); + memset(&tss.sc, 0, sizeof(tss.sc)); +} + +static int +ts_prepare(void) { int rc = ts_reftable_init(&tss.rt); if (rc == -1) return -1; - ts_options_init(&tss.opts); - memset(&tss.s, 0, sizeof(tss.s)); tss.last_snap_lsn = 0; tss.last_xlog_lsn = 0; slab_cache_create(&tss.sc); @@ -72,19 +82,40 @@ ts_init(void) static void ts_free(void) { - ts_options_free(&tss.opts); - ts_space_free(&tss.s); ts_reftable_free(&tss.rt); region_free(&tss.ra); slab_cache_destroy(&tss.sc); } -int main(int argc, char *argv[]) +static void +ts_shutdown(void) { - int rc = ts_init(); - if (rc == -1) - return 1; + ts_space_free(&tss.s); + ts_options_free(&tss.opts); + ts_free(); +} +void +ts_oomcheck(void) +{ +#ifdef __linux__ + struct mallinfo mi = mallinfo(); + if (tss.opts.limit > 0 && mi.uordblks > tss.opts.limit) { + printf("\nmemory limit reached (%"PRIu64")\n", tss.opts.limit); + exit(2); + } + return; +#else + if (tss.opts.limit > 0 && tss.alloc > tss.opts.limit) { + printf("\nmemory limit reached (%"PRIu64")\n", tss.opts.limit); + exit(2); + } +#endif +} + +int main(int argc, char *argv[]) +{ + ts_init(); /* parse arguments */ switch (ts_options_process(&tss.opts, argc, argv)) { case TS_MODE_USAGE: @@ -98,34 +129,56 @@ int main(int argc, char *argv[]) } /* load configuration file */ - rc = ts_config_load(&tss.opts); - if (rc == -1) - goto done; + int rc = ts_config_load(&tss.opts); + if (rc == -1) { + ts_options_free(&tss.opts); + return 1; + } + if (tss.opts.cfg.snap_dir == NULL) { + printf("snap_dir and wal_dir must be defined.\n"); + ts_options_free(&tss.opts); + return 1; + } /* create spaces */ rc = ts_space_init(&tss.s); - if (rc == -1) - goto done; + if (rc == -1) { + ts_space_free(&tss.s); + ts_options_free(&tss.opts); + return 1; + } rc = ts_space_fill(&tss.s, &tss.opts); - if (rc == -1) - goto done; + if (rc == -1) { + ts_space_free(&tss.s); + ts_options_free(&tss.opts); + return 1; + } - printf("work_dir: %s\n", tss.opts.cfg.work_dir); printf("snap_dir: %s\n", tss.opts.cfg.snap_dir); printf("wal_dir: %s\n", tss.opts.cfg.wal_dir); printf("spaces: %d\n", mh_size(tss.s.t)); - - /* indexate snapshot and xlog data */ - rc = ts_indexate(); - if (rc == -1) - goto done; - /* write snapshot */ - rc = ts_snapshot_create(); - if (rc == -1) - goto done; - printf("complete.\n"); - + printf("interval: %d\n", tss.opts.interval); + printf("memory_limit: %dM\n", (int)(tss.opts.limit / 1024 / 1024)); + + do { + time_t tm = time(NULL); + printf("\nSTART SNAPSHOTTING %s\n", ctime(&tm)); + rc = ts_prepare(); + if (rc == -1) + goto done; + /* indexate snapshot and xlog data */ + rc = ts_indexate(); + if (rc == -1) + goto done; + /* write snapshot */ + rc = ts_snapshot_create(); + if (rc == -1) + goto done; + ts_free(); + ts_space_recycle(&tss.s); + sleep(tss.opts.interval); + } while (tss.opts.interval > 0); done: - ts_free(); + ts_shutdown(); return (rc == -1 ? 1 : 0); } diff --git a/client/tarantar/options.c b/client/tarantar/options.c index b61c729c907f8bdf951278188cf834a697cf221f..1d20e13a186cf2fd4ddd060ef0604347d271afd5 100644 --- a/client/tarantar/options.c +++ b/client/tarantar/options.c @@ -44,8 +44,12 @@ static const void *opts_def = gopt_start( gopt_option('c', 0, gopt_shorts('c'), gopt_longs("create"), NULL, "create snapshot file"), - gopt_option('l', GOPT_ARG, gopt_shorts('l'), + gopt_option('i', GOPT_ARG, gopt_shorts('i'), + gopt_longs("interval"), " <sec>", "periodically create snapshot"), + gopt_option('n', GOPT_ARG, gopt_shorts('n'), gopt_longs("lsn"), " <u64>", "snapshot lsn (latest by default)"), + gopt_option('l', GOPT_ARG, gopt_shorts('l'), + gopt_longs("limit"), " <limit>", "memory limit (bytes)"), gopt_option('?', 0, gopt_shorts(0), gopt_longs("help"), NULL, "display this help and exit"), gopt_option('v', 0, gopt_shorts('v'), gopt_longs("version"), @@ -63,8 +67,9 @@ void ts_options_free(struct ts_options *opts) { int ts_options_usage(void) { - printf("Tarantool XLOG compression utility.\n"); - printf("Usage: tarantar <options> <tarantool_config>\n\n"); + printf("Tarantool xlog compression utility.\n\n"); + + printf("Usage: tarantar <options> <tarantool_config>\n"); gopt_help(opts_def); return 1; } @@ -74,7 +79,7 @@ ts_options_process(struct ts_options *opts, int argc, char **argv) { void *opt = gopt_sort(&argc, (const char**)argv, opts_def); /* usage */ - if (gopt(opt, '?') || argc != 2) { + if (gopt(opt, '?')) { opts->mode = TS_MODE_USAGE; goto done; } @@ -86,10 +91,19 @@ ts_options_process(struct ts_options *opts, int argc, char **argv) /* lsn */ const char *arg = NULL; - if (gopt_arg(opt, 'l', &arg)) { + if (gopt_arg(opt, 'n', &arg)) { opts->to_lsn = atoll(arg); opts->to_lsn_set = 1; } + + /* limit */ + if (gopt_arg(opt, 'l', &arg)) + opts->limit = strtoll(arg, NULL, 10); + + /* sleep */ + if (gopt_arg(opt, 'i', &arg)) + opts->interval = atoi(arg); + /* generate or verify */ if (gopt(opt, 'c')) { opts->mode = TS_MODE_CREATE; diff --git a/client/tarantar/options.h b/client/tarantar/options.h index d571aaa7a6daa4f329c3a7e0b411c564e1645c65..a7b1da7b96ad27004e57ced8f264ad95d0f3cf04 100644 --- a/client/tarantar/options.h +++ b/client/tarantar/options.h @@ -8,8 +8,10 @@ enum ts_options_mode { }; struct ts_options { + uint64_t limit; enum ts_options_mode mode; int to_lsn_set; + int interval; uint64_t to_lsn; const char *file_config; struct tarantool_cfg cfg; diff --git a/client/tarantar/ref.c b/client/tarantar/ref.c index 6e5df497c986d81cf5594c70cbd9fe391f0efc47..a3f5e49c490d18e8641f88d80fa731a367e797da 100644 --- a/client/tarantar/ref.c +++ b/client/tarantar/ref.c @@ -20,7 +20,10 @@ int ts_reftable_init(struct ts_reftable *t) void ts_reftable_free(struct ts_reftable *t) { - free(t->r); + if (t->r) { + free(t->r); + t->r = NULL; + } } int ts_reftable_add(struct ts_reftable *t, char *file, int is_snap) diff --git a/client/tarantar/snapshot.c b/client/tarantar/snapshot.c index bd260ceb8081be93327b6c9d699f0b9ac0f1de2e..61958729a7efa82202dc815543d6c7cb95fd35dc 100644 --- a/client/tarantar/snapshot.c +++ b/client/tarantar/snapshot.c @@ -6,6 +6,7 @@ #include <stdio.h> #include <string.h> #include <assert.h> +#include <unistd.h> #include <third_party/crc32.h> @@ -138,13 +139,13 @@ int ts_snapshot_create(void) unsigned long long snap_lsn = tss.last_xlog_lsn; - if (tss.last_snap_lsn == snap_lsn) { - printf("snapshot already exists.\n"); + if (snap_lsn == 0 || tss.last_snap_lsn == snap_lsn) { + printf("snapshot exists, skip.\n"); return 0; } char path[1024]; - snprintf(path, sizeof(path), "%s/%020llu.snap", tss.opts.cfg.snap_dir, + snprintf(path, sizeof(path), "%s/%020llu.snap.inprocess", tss.opts.cfg.snap_dir, (unsigned long long) snap_lsn); FILE *snapshot = fopen(path, "a"); @@ -227,10 +228,24 @@ int ts_snapshot_create(void) printf("failed to write row\n"); goto error; } + if (fflush(snapshot) != 0) { + printf("flush failed\n"); + goto error; + } + if (fsync(fileno(snapshot)) != 0) { + printf("sync failed\n"); + goto error; + } if (fclose(snapshot) != 0) { printf("failed to write row\n"); } + char newpath[1024]; + strncpy(newpath, path, sizeof(newpath)); + char *ext = strrchr(newpath, '.'); + *ext = 0; + rename(path, newpath); + tnt_log_close(¤t); printf("\n"); return 0; diff --git a/client/tarantar/space.c b/client/tarantar/space.c index 3e9513010e01a97677b46bbd328f181d4abb7b7d..82ecbc9270b3cda3e68c63e17102d41888d99d75 100644 --- a/client/tarantar/space.c +++ b/client/tarantar/space.c @@ -39,11 +39,20 @@ #include <cfg/prscfg.h> #include <cfg/tarantool_box_cfg.h> +#include <lib/small/region.h> + #include "key.h" #include "hash.h" #include "options.h" +#include "config.h" #include "space.h" +#include "ref.h" +#include "ts.h" #include "sha1.h" +#include "indexate.h" +#include "snapshot.h" + +extern struct ts tss; int ts_space_init(struct ts_spaces *s) { s->t = mh_u32ptr_new(); @@ -52,9 +61,27 @@ int ts_space_init(struct ts_spaces *s) { return 0; } +void ts_space_recycle(struct ts_spaces *s) +{ + mh_int_t i; + mh_foreach(s->t, i) { + struct ts_space *space = mh_u32ptr_node(s->t, i)->val; + mh_int_t pos = 0; + while (pos != mh_end(space->index)) { + if (mh_exist((space->index), pos)) { + struct ts_key *k = + *mh_pk_node(space->index, pos); + free(k); + } + pos++; + } + mh_pk_delete(space->index); + space->index = mh_pk_new(); + } +} + void ts_space_free(struct ts_spaces *s) { - return; mh_int_t i; mh_foreach(s->t, i) { struct ts_space *space = mh_u32ptr_node(s->t, i)->val; @@ -316,11 +343,24 @@ struct ts_key* ts_space_keyalloc(struct ts_space *s, struct tnt_tuple *t, int fileid, int offset, int attach) { + struct ts_key *k = NULL; switch (s->c) { case TS_SPACE_COMPACT_CHECKSUM: - return ts_space_keyalloc_sha(s, t, fileid, offset, attach); + k = ts_space_keyalloc_sha(s, t, fileid, offset, attach); + break; case TS_SPACE_COMPACT_SPARSE: - return ts_space_keyalloc_sparse(s, t, fileid, offset, attach); + k = ts_space_keyalloc_sparse(s, t, fileid, offset, attach); + break; } - return NULL; + if (k == NULL) + return NULL; + tss.alloc += ts_space_keysize(s, k); + ts_oomcheck(); + return k; +} + +void +ts_space_keyfree(struct ts_space *s, struct ts_key *k) { + tss.alloc -= ts_space_keysize(s, k); + free(k); } diff --git a/client/tarantar/space.h b/client/tarantar/space.h index 4593835c16daf5ec9a6de57e0c064b17eae73822..a338b9101c1af38195670ed3bb28b0bc6652792b 100644 --- a/client/tarantar/space.h +++ b/client/tarantar/space.h @@ -39,6 +39,7 @@ struct ts_spaces { int ts_space_init(struct ts_spaces *s); void ts_space_free(struct ts_spaces *s); +void ts_space_recycle(struct ts_spaces *s); struct ts_space *ts_space_create(struct ts_spaces *s, uint32_t id); struct ts_space *ts_space_match(struct ts_spaces *s, uint32_t id); @@ -49,4 +50,15 @@ struct ts_key* ts_space_keyalloc(struct ts_space *s, struct tnt_tuple *t, int fileid, int offset, int attach); +void +ts_space_keyfree(struct ts_space *s, struct ts_key *k); + +static inline size_t +ts_space_keysize(struct ts_space *s, struct ts_key *k) { + size_t size = sizeof(struct ts_key) + s->key_size; + if (k->flags == TS_KEY_WITH_DATA) + size += sizeof(uint32_t) + *(uint32_t*)(k->key + s->key_size); + return size; +} + #endif diff --git a/client/tarantar/ts.h b/client/tarantar/ts.h index a385dbc1d4acda0a5265f608f884fc4c4ff6ec15..bbce89344ac0174c6fd6eb2e8b3fbc2ffa0879cb 100644 --- a/client/tarantar/ts.h +++ b/client/tarantar/ts.h @@ -8,9 +8,12 @@ struct ts { uint64_t last_snap_lsn; uint64_t last_xlog_lsn; int to_lsn_set; + uint64_t alloc; uint64_t to_lsn; struct slab_cache sc; struct region ra; }; +void ts_oomcheck(void); + #endif diff --git a/connector/c/tntrpl/tnt_dir.c b/connector/c/tntrpl/tnt_dir.c index ed26decc05b9452093b890dcf5d0381b1530b0d6..3bd113f01e5b8d918526a219402f28a19669daf4 100644 --- a/connector/c/tntrpl/tnt_dir.c +++ b/connector/c/tntrpl/tnt_dir.c @@ -116,8 +116,7 @@ int tnt_dir_scan(struct tnt_dir *d, char *path) { switch (d->type) { case TNT_DIR_XLOG: - if (strcmp(ext, ".xlog") != 0 && - strcmp(ext, ".xlog.inprogress") != 0) + if (strcmp(ext, ".xlog") != 0) continue; break; case TNT_DIR_SNAPSHOT: