diff --git a/core/exception.m b/core/exception.m index 955c91c86aec3c6629b760d4382f7bf2f6190286..8580e1ee693b291dac714ccb50f1d7603124795b 100644 --- a/core/exception.m +++ b/core/exception.m @@ -85,7 +85,6 @@ @implementation IllegalParams - (id) init: (const char*) msg { - printf("IllegalParams init\n"); return [super init: ER_ILLEGAL_PARAMS, msg]; } @end diff --git a/core/fiber.m b/core/fiber.m index 10601eb184b18f7bbc77f3bbeca2aaa5cddb4e4e..0b9cb25d10bb39a6acba8a8b6faa796ba7a52245 100644 --- a/core/fiber.m +++ b/core/fiber.m @@ -64,8 +64,6 @@ static struct fiber **sp, *call_stack[64]; static uint32_t last_used_fid; static struct palloc_pool *ex_pool; -static uint32_t watermark = 0; - struct fiber_cleanup { void (*handler) (void *data); void *data; @@ -442,12 +440,9 @@ fiber_loop(void *data __attribute__((unused))) @catch (FiberCancelException *e) { say_info("fiber `%s' has been cancelled", fiber->name); - if (fiber->waiter != NULL) { + if (fiber->waiter != NULL) fiber_call(fiber->waiter); - fiber->waiter = NULL; - } - say_info("fiber `%s': exiting", fiber->name); } @catch (id e) { @@ -1230,7 +1225,7 @@ void fiber_init(void) { SLIST_INIT(&fibers); - fibers_registry = kh_init(fid2fiber, &watermark); + fibers_registry = kh_init(fid2fiber, NULL); ex_pool = palloc_create_pool("ex_pool"); diff --git a/core/iproto.m b/core/iproto.m index fe572d10a3568a3e0de1a25c5a551f5bd3bedbcd..3e7a5a9e6304cf8feaa861e5b98807b11e3ac137 100644 --- a/core/iproto.m +++ b/core/iproto.m @@ -40,7 +40,7 @@ const uint32_t msg_ping = 0xff00; static void iproto_reply(iproto_callback callback, struct tbuf *request); void -iproto_interact(iproto_callback callback) +iproto_interact(iproto_callback *callback) { struct tbuf *in = fiber->rbuf; ssize_t to_read = sizeof(struct iproto_header); @@ -56,7 +56,7 @@ iproto_interact(iproto_callback callback) break; struct tbuf *request = tbuf_split(in, request_len); - iproto_reply(callback, request); + iproto_reply(*callback, request); to_read = sizeof(struct iproto_header) - in->len; diff --git a/core/log_io.m b/core/log_io.m index 8239bd6df6889b7d7c7ef1e9fb06e1141c8738f4..1bb54b1d934ec1ea7de6694f2a4a4493e0564852 100644 --- a/core/log_io.m +++ b/core/log_io.m @@ -71,6 +71,15 @@ struct log_io_iter { int io_rate_limit; }; + +void +wait_lsn_set(struct wait_lsn *wait_lsn, i64 lsn) +{ + assert(wait_lsn->waiter == NULL); + wait_lsn->waiter = fiber; + wait_lsn->lsn = lsn; +} + int confirm_lsn(struct recovery_state *r, i64 lsn) { @@ -82,6 +91,14 @@ confirm_lsn(struct recovery_state *r, i64 lsn) " new:%" PRIi64 " diff: %" PRIi64, r->confirmed_lsn, lsn, lsn - r->confirmed_lsn); r->confirmed_lsn = lsn; + /* Alert the waiter, if any. There can be holes in + * confirmed_lsn, in case of disk write failure, + * but wal_writer never confirms LSNs out order. + */ + if (r->wait_lsn.waiter && r->confirmed_lsn >= r->wait_lsn.lsn) { + fiber_call(r->wait_lsn.waiter); + } + return 0; } else { say_warn("lsn double confirmed:%" PRIi64, r->confirmed_lsn); @@ -90,6 +107,23 @@ confirm_lsn(struct recovery_state *r, i64 lsn) return -1; } + +/** Wait until the given LSN makes its way to disk. */ + +void +recovery_wait_lsn(struct recovery_state *r, i64 lsn) +{ + while (lsn < r->confirmed_lsn) { + wait_lsn_set(&r->wait_lsn, lsn); + @try { + yield(); + } @finally { + wait_lsn_clear(&r->wait_lsn); + } + } +} + + i64 next_lsn(struct recovery_state *r, i64 new_lsn) { @@ -1015,7 +1049,7 @@ recover(struct recovery_state *r, i64 lsn) i64 next_lsn = r->confirmed_lsn + 1; i64 lsn = find_including_file(r->wal_class, next_lsn); if (lsn <= 0) { - say_error("can't find wal containing record with lsn:%" PRIi64, next_lsn); + say_error("can't find WAL containing record with lsn:%" PRIi64, next_lsn); result = -1; goto out; } @@ -1161,7 +1195,8 @@ write_to_disk(void *_state, struct tbuf *t) &unused); } else if (wal->rows == 1) { - /* rename wal after first successfull write to name without inprogress suffix*/ + /* rename WAL after first successful write to name + * without inprogress suffix*/ if (inprogress_log_rename(wal->filename) != 0) { say_error("can't rename inprogress wal"); goto fail; @@ -1282,6 +1317,7 @@ recover_init(const char *snap_dirname, const char *wal_dirname, r->wal_class = xlog_class_create(wal_dirname); r->wal_class->rows_per_file = rows_per_file; r->wal_class->fsync_delay = fsync_delay; + wait_lsn_clear(&r->wait_lsn); if ((flags & RECOVER_READONLY) == 0) r->wal_writer = spawn_child("wal_writer", inbox_size, write_to_disk, r); diff --git a/core/log_io_remote.m b/core/log_io_remote.m index c64c0f6aa2e452cb30cef916ba07e16bb0d1be65..47637b9db5d9a0b8284217897e667dd435cd4413 100644 --- a/core/log_io_remote.m +++ b/core/log_io_remote.m @@ -102,8 +102,8 @@ remote_read_row(i64 initial_lsn) goto err; } - say_crit("succefully connected to feeder"); - say_crit("starting remote recovery from lsn:%" PRIi64, initial_lsn); + say_crit("successfully connected to feeder"); + say_crit("starting replication from lsn:%" PRIi64, initial_lsn); warning_said = false; err = NULL; } diff --git a/include/iproto.h b/include/iproto.h index a4f4fc12b109d71bc3da8990c0c04c12474e4246..34d7fa69f9a1285991a8efed45ca55ee9e34a4b6 100644 --- a/include/iproto.h +++ b/include/iproto.h @@ -54,6 +54,6 @@ static inline struct iproto_header *iproto(const struct tbuf *t) typedef void (*iproto_callback) (uint32_t msg_code, struct tbuf *request); -void iproto_interact(iproto_callback callback); +void iproto_interact(iproto_callback *callback); #endif diff --git a/include/log_io.h b/include/log_io.h index f47601e0461979ef8e89c22fe2ba062655320912..55a1a1655000118164d41d47e22cd75e60a26299 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -65,6 +65,26 @@ struct log_io_class { const char *dirname; }; + +/** A "condition variable" that allows fibers to wait when a given + * LSN makes it to disk. + */ + +struct wait_lsn { + struct fiber *waiter; + i64 lsn; +}; + +void +wait_lsn_set(struct wait_lsn *wait_lsn, i64 lsn); + +inline static void +wait_lsn_clear(struct wait_lsn *wait_lsn) +{ + wait_lsn->waiter = NULL; + wait_lsn->lsn = 0LL; +} + struct log_io { struct log_io_class *class; FILE *f; @@ -95,6 +115,7 @@ struct recovery_state { int snap_io_rate_limit; u64 cookie; + struct wait_lsn wait_lsn; bool finalize; @@ -142,6 +163,7 @@ void recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_ int confirm_lsn(struct recovery_state *r, i64 lsn); int64_t next_lsn(struct recovery_state *r, i64 new_lsn); +void recovery_wait_lsn(struct recovery_state *r, i64 lsn); int read_log(const char *filename, row_handler xlog_handler, row_handler snap_handler, void *state); diff --git a/mod/box/box.m b/mod/box/box.m index 30af659d5deda44818b058c3757bf991316761f4..d66841ada1a74a59a62409a8f2a5d066abb9cd7c 100644 --- a/mod/box/box.m +++ b/mod/box/box.m @@ -47,10 +47,13 @@ #include <cfg/tarantool_box_cfg.h> #include <mod/box/index.h> +static void box_process_ro(u32 op, struct tbuf *request_data); +static void box_process_rw(u32 op, struct tbuf *request_data); + const char *mod_name = "Box"; -bool box_updates_allowed = false; -static char *status = "unknown"; +iproto_callback rw_callback = box_process_ro; +static char status[64] = "unknown"; static int stat_base; STRS(messages, MESSAGES); @@ -761,9 +764,11 @@ txn_commit(struct box_txn *txn) tbuf_append(t, txn->req.data, txn->req.len); i64 lsn = next_lsn(recovery_state, 0); - if (!wal_write(recovery_state, wal_tag, fiber->cookie, lsn, t)) - tnt_raise(LoggedError, :ER_WAL_IO); + bool res = !wal_write(recovery_state, wal_tag, + fiber->cookie, lsn, t); confirm_lsn(recovery_state, lsn); + if (res) + tnt_raise(LoggedError, :ER_WAL_IO); } unlock_tuples(txn); @@ -1150,9 +1155,6 @@ box_process_ro(u32 op, struct tbuf *request_data) static void box_process_rw(u32 op, struct tbuf *request_data) { - if (!op_is_select(op) && !box_updates_allowed) - tnt_raise(LoggedError, :ER_NONMASTER); - return box_process(txn_alloc(0), op, request_data); } @@ -1237,8 +1239,7 @@ remote_recovery_restart(struct tarantool_cfg *conf) conf->wal_feeder_port, default_remote_row_handler); - status = palloc(eter_pool, 64); - snprintf(status, 64, "replica/%s:%i%s", conf->wal_feeder_ipaddr, + snprintf(status, sizeof(status), "replica/%s:%i%s", conf->wal_feeder_ipaddr, conf->wal_feeder_port, custom_proc_title); title("replica/%s:%i%s", conf->wal_feeder_ipaddr, conf->wal_feeder_port, custom_proc_title); @@ -1248,23 +1249,24 @@ static void box_master_or_slave(struct tarantool_cfg *conf) { if (conf->remote_hot_standby) { - box_updates_allowed = false; + rw_callback = box_process_ro; + + recovery_wait_lsn(recovery_state, recovery_state->lsn); remote_recovery_restart(conf); } else { if (remote_recover) { - say_info("shuting down the replica"); + say_info("shutting down the replica"); fiber_cancel(remote_recover); remote_recover = NULL; } + rw_callback = box_process_rw; - say_info("I am primary"); - - box_updates_allowed = true; - - status = "primary"; + snprintf(status, sizeof(status), "primary"); title("primary"); + + say_info("I am primary"); } } @@ -1329,6 +1331,8 @@ mod_reload_config(struct tarantool_cfg *old_conf, struct tarantool_cfg *new_conf void mod_init(void) { + static iproto_callback ro_callback = box_process_ro; + stat_base = stat_register(messages_strs, messages_MAX); namespace = palloc(eter_pool, sizeof(struct namespace) * namespace_count); @@ -1417,7 +1421,7 @@ mod_init(void) if (cfg.local_hot_standby) { say_info("starting local hot standby"); recover_follow(recovery_state, cfg.wal_dir_rescan_delay); - status = "hot_standby"; + snprintf(status, sizeof(status), "hot_standby"); title("hot_standby"); } @@ -1428,15 +1432,13 @@ mod_init(void) if (cfg.secondary_port != 0) fiber_server(tcp_server, cfg.secondary_port, (fiber_server_callback) iproto_interact, - box_process_ro, NULL); + &ro_callback, NULL); if (cfg.primary_port != 0) fiber_server(tcp_server, cfg.primary_port, (fiber_server_callback) iproto_interact, - box_process_rw, box_bound_to_primary); + &rw_callback, box_bound_to_primary); } - - say_info("initialized"); } int diff --git a/test/box/stat.result b/test/box/stat.result index a6d113387ab37fc5a7b9720380a649805e8a8db5..cbcff934202c93dc5262a6c0899db1e152cd16d2 100644 --- a/test/box/stat.result +++ b/test/box/stat.result @@ -49,3 +49,23 @@ statistics: UPDATE_FIELDS: { rps: 0 , total: 0 } DELETE: { rps: 0 , total: 0 } ... +delete from t0 where k0 = 0 +Delete OK, 1 row affected +delete from t0 where k0 = 1 +Delete OK, 1 row affected +delete from t0 where k0 = 2 +Delete OK, 1 row affected +delete from t0 where k0 = 3 +Delete OK, 1 row affected +delete from t0 where k0 = 4 +Delete OK, 1 row affected +delete from t0 where k0 = 5 +Delete OK, 1 row affected +delete from t0 where k0 = 6 +Delete OK, 1 row affected +delete from t0 where k0 = 7 +Delete OK, 1 row affected +delete from t0 where k0 = 8 +Delete OK, 1 row affected +delete from t0 where k0 = 9 +Delete OK, 1 row affected diff --git a/test/box/stat.test b/test/box/stat.test index ef6140449bfee52867910b6914a82e6724ecd116..1e8140d07fa775a46d53582df817e0068984e414 100644 --- a/test/box/stat.test +++ b/test/box/stat.test @@ -21,4 +21,9 @@ print """# # """ exec admin "show stat" + +# cleanup +for i in range(10): + exec sql "delete from t0 where k0 = {0}".format(i) + # vim: syntax=python