From 4cb7c9f5a23bbc8aff69b55e8b93208a748174ac Mon Sep 17 00:00:00 2001 From: Roman Tsisyk <roman@tsisyk.com> Date: Mon, 24 Mar 2014 17:15:39 +0400 Subject: [PATCH] Use MsgPack for xlog/snaps files and IPROTO for replication * Change format of xlog/snap files to MsgPack-based format * Update replication to use IPROTO protocol * Rework replica bootstrap to process snapshot by rows * Add support for composite rows in fio_batch * Fix iproto->sync in SUBSCRIBE command --- src/CMakeLists.txt | 2 +- src/admin.cc | 5 +- src/bootstrap.snap | Bin 1627 -> 1187 bytes src/box/box.cc | 59 ++++++----- src/box/lua/call.cc | 4 +- src/box/request.cc | 49 ++++----- src/box/request.h | 16 ++- src/box/txn.cc | 34 +++++-- src/box/txn.h | 9 +- src/fio.c | 55 ++++++---- src/fio.h | 17 +++- src/iproto.cc | 124 +++++++---------------- src/iproto_constants.c | 68 ------------- src/iproto_constants.cc | 193 ++++++++++++++++++++++++++++++++++++ src/iproto_constants.h | 64 +++++++++++- src/iproto_port.cc | 14 +-- src/iproto_port.h | 9 +- src/log_io.cc | 161 +++++++++++++++++++----------- src/log_io.h | 39 ++------ src/recovery.cc | 153 +++++++++++++--------------- src/recovery.h | 24 +++-- src/replica.cc | 144 ++++++++++++++++++--------- src/replica.h | 4 +- src/replication.cc | 138 ++++++++++++++++++-------- src/replication.h | 2 +- src/session.cc | 9 +- test/box/bad_trigger.result | 7 +- test/box/dup_key1.xlog | Bin 281 -> 247 bytes test/box/dup_key2.xlog | Bin 144 -> 124 bytes test/box/unfinished.xlog | Bin 79 -> 122 bytes 30 files changed, 845 insertions(+), 558 deletions(-) delete mode 100644 src/iproto_constants.c create mode 100644 src/iproto_constants.cc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d1f8cac069..2a8220a154 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -60,7 +60,7 @@ set (common_sources admin.cc replica.cc iproto.cc - iproto_constants.c + iproto_constants.cc iproto_port.cc session.cc object.cc diff --git a/src/admin.cc b/src/admin.cc index 15ee0af324..ae2655433c 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -89,8 +89,9 @@ admin_handler(va_list ap) * stored procedures. */ - session_set_user(session_create(coio.fd, *(uint64_t *) addr), - ADMIN, ADMIN); + struct session *session = session_create(coio.fd, *(uint64_t *) addr); + session_set_user(session, ADMIN, ADMIN); + trigger_run(&session_on_connect, NULL); for (;;) { if (admin_dispatch(&coio, iobuf, L) < 0) diff --git a/src/bootstrap.snap b/src/bootstrap.snap index 70257782f59a443cdd8e51a3358586f571749bbd..6ee1bc4150098ff3370e4627f37e8d35b969682b 100644 GIT binary patch literal 1187 zcmZ|N&1(};6b116CfaK1$M}&Fq}2(E4HU|>LWu=IH*q1<Le~;Q`l>-^Qf4x>f)FU8 zn=X_hlqjW`P9v675jVQhKOk>mD6Vx;P!Xhvbm^O!PCSY2{{G&%oOj1ZFO5a@fx)3@ zbp3tTYMjLLzW$Tdb0wmb^JSKl2%A*}5mOWM)oE_o`GUzhywL=S&Bf7A{^(Z8A60_U zc;ZQ&sjQl@b2qt>bq#J5XUH=eiSrl72GK;o#Mw&5p33HUE7_Rq89=fJh$czioZvG+ zPLX(frK};@3na<h%%o%HTE&HhSRaZG5V{nO&8=3^b8qW2icSzIDW<IaG>Ee#V(C{W zeDUJVSFhTAfzT2QvM;JiU}UYk%SDTG#;9T!Ew3~Q*U~Q+5$*>V{~wx;;lVQ`uDu?9 zgfNPbxrI9>xBNU}tg_8clPL6F|A?#$nX+tV4Zgjl5^mW~;zxOP9MyqPmFrAv+fpGf zu9=x6(MUfWL46Q4W(KqwEbFPvHHMKM3aRWh*%mB&{Z~)?@%<r$hY{{x&+QMFH;}2w za8-s|etYoiDXJr&^j)`<zw!I~TFcoSMYwaEPNMT_?GMHs3)QnaOSQ#C{EzULjbJ^S z)(SpxJbXg-DldG;>**#zw@QV8z4OmJ7BD?APfP9%hucN(I_2O<qt=Uq-OWL^<Yp&~ Kyt%H{e*OiRgr~It literal 1627 zcmWIca}3}z&@(jR;(EK9dsoF{nT3oD5TLeMCoJv66-QAZ_uqf0Aj9?Y``JtkO#<f_ z7#IZ<CoeBcEh^5;&tqf*Y4@7R-p_=lT@FRNrFQxin05)psb?7l7?;NvCugMQCN9lQ z%`K^50GYFod*^*-G;?H8%wfK9{V<9-5{%2@iwhEyQ<03}IKN7t1+Ov9``l|#j8Om@ zlbM&2T7hJY=*7cZSka7;L2=CtR*z*U#%M4ujZZ7hOGYxK^$+t@HoT_P+uk^fVu}IK zl+xnVA|zANCf|`|$7_nd6VoOXQ!Id{6clBaA(?XQrI;88nkmXCKJqSB34)oTpg02< zPYla}hU6v|RW2zhN=;>CWLR8WQUr>u+^cVdIPn@L_xHkd6vHGi4a+Od1sS$1&9s;c z%`in2ml?-Y&p|beacO>eUTP88T?~wji=js8b)9SFMl(tQ#i+b4y(m<pn3m)v=B7e@ z$OQ9YXe-|h9yF6QP)y>UX)qIE(yX(L3TQrLVgNY{?lw!MihN!)6ID@6oGTKMglZzP z+aSil9hft*NRkiDIMnzoaI}1mFb)`>8fXp##wRq~_MLlI!H;GbY7~^!rVF7Oh9e4o zymmMufM%2ekdG)%7Ttc<foc@8HyPnR%$>gEk|3H%s6KR`?|UD`Bm<m2ytj16T_H5X zP@U!Sx#=OQVc4Bjuc&xc7|kSIARpnuRap!D5hl$!%V>e-EM{<`0DF`f76q^JSPMna zj75!tt#flS5XJ)I6PU}f<^qdfGetoqI0(Q>bdZY?E>zaJqz^O7Kyfm|f~D!Dsl_Fr i+$bog8zu(Uic7Q3kBclwni&@?O-#wn%#&NaRsaB27~`%0 diff --git a/src/box/box.cc b/src/box/box.cc index fc376105d7..9e56cdcae9 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -63,16 +63,16 @@ box_process_func box_process = process_ro; static int stat_base; int snapshot_pid = 0; /* snapshot processes pid */ +static void +box_snapshot_cb(struct log_io *l); /** The snapshot row metadata repeats the structure of REPLACE request. */ -struct box_snap_row { - uint16_t op; +struct request_replace_body { uint8_t m_body; uint8_t k_space_id; uint8_t m_space_id; uint32_t v_space_id; uint8_t k_tuple; - char data[]; } __attribute__((packed)); void @@ -89,7 +89,7 @@ process_rw(struct port *port, struct request *request) struct txn *txn = txn_begin(); try { - stat_collect(stat_base, request->type, 1); + stat_collect(stat_base, request->code, 1); request->execute(request, txn, port); txn_commit(txn); port_send_tuple(port, txn); @@ -104,7 +104,7 @@ process_rw(struct port *port, struct request *request) static void process_replica(struct port *port, struct request *request) { - if (!iproto_request_is_select(request->type)) { + if (!iproto_request_is_select(request->code)) { tnt_raise(ClientError, ER_NONMASTER, cfg.replication_source); } @@ -114,21 +114,21 @@ process_replica(struct port *port, struct request *request) static void process_ro(struct port *port, struct request *request) { - if (!iproto_request_is_select(request->type)) + if (!iproto_request_is_select(request->code)) tnt_raise(LoggedError, ER_SECONDARY); return process_rw(port, request); } static int -recover_row(void *param __attribute__((unused)), const struct log_row *row) +recover_row(void *param __attribute__((unused)), struct iproto_packet *packet) { try { - const char *data = row->data; - const char *end = data + row->len; - uint16_t op = pick_u16(&data, end); + assert(packet->bodycnt == 1); /* always 1 for read */ struct request request; - request_create(&request, op); - request_decode(&request, data, end - data); + request_create(&request, packet->code); + request_decode(&request, (const char *) packet->body[0].iov_base, + packet->body[0].iov_len); + request.packet = packet; process_rw(&null_port, &request); } catch (Exception *e) { e->log(); @@ -302,7 +302,7 @@ box_init() /* recovery initialization */ recovery_init(cfg.snap_dir, cfg.wal_dir, - recover_row, NULL, cfg.rows_per_wal); + recover_row, NULL, box_snapshot_cb, cfg.rows_per_wal); recovery_update_io_rate_limit(recovery_state, cfg.snap_io_rate_limit); recovery_setup_panic(recovery_state, cfg.panic_on_snap_error, cfg.panic_on_wal_error); @@ -327,18 +327,25 @@ static void snapshot_write_tuple(struct log_io *l, uint32_t n, struct tuple *tuple) { - struct box_snap_row header; - header.op = IPROTO_INSERT; - header.m_body = 0x82; /* map of two elements. */ - header.k_space_id = IPROTO_SPACE_ID; - header.m_space_id = 0xce; /* uint32 */ - header.v_space_id = mp_bswap_u32(n); - header.k_tuple = IPROTO_TUPLE; - snapshot_write_row(l, (const char *) &header, sizeof(header), - tuple->data, tuple->bsize); + struct request_replace_body body; + body.m_body = 0x82; /* map of two elements. */ + body.k_space_id = IPROTO_SPACE_ID; + body.m_space_id = 0xce; /* uint32 */ + body.v_space_id = mp_bswap_u32(n); + body.k_tuple = IPROTO_TUPLE; + + struct iproto_packet packet; + memset(&packet, 0, sizeof(packet)); + packet.code = IPROTO_INSERT; + + packet.bodycnt = 2; + packet.body[0].iov_base = &body; + packet.body[0].iov_len = sizeof(body); + packet.body[1].iov_base = tuple->data; + packet.body[1].iov_len = tuple->bsize; + snapshot_write_row(l, &packet); } - static void snapshot_space(struct space *sp, void *udata) { @@ -356,7 +363,7 @@ snapshot_space(struct space *sp, void *udata) snapshot_write_tuple(l, space_id(sp), tuple); } -void +static void box_snapshot_cb(struct log_io *l) { space_foreach(snapshot_space, l); @@ -393,7 +400,7 @@ box_snapshot(void) * parent stdio buffers at exit(). */ close_all_xcpt(1, sayfd); - snapshot_save(recovery_state, box_snapshot_cb); + snapshot_save(recovery_state); exit(EXIT_SUCCESS); return 0; @@ -404,7 +411,7 @@ box_init_storage(const char *dirname) { struct log_dir dir = snap_dir; dir.dirname = (char *) dirname; - init_storage(&dir, NULL); + init_storage_on_master(&dir); } void diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index b9429b0638..559e66299a 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -43,6 +43,7 @@ #include "box/box.h" #include "box/port.h" #include "box/request.h" +#include "box/txn.h" #include "bit/bit.h" #include "box/access.h" #include "box/schema.h" @@ -502,8 +503,7 @@ access_check_func(const char *name, uint32_t name_len, * (implementation of 'CALL' command code). */ void -box_lua_call(struct request *request, struct txn *txn, - struct port *port) +box_lua_call(struct request *request, struct txn *txn, struct port *port) { struct user *user = user(); (void) txn; diff --git a/src/box/request.cc b/src/box/request.cc index f4703f1f09..21bee5b6e7 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -109,7 +109,8 @@ execute_replace(struct request *request, struct txn *txn, struct port *port) request->tuple_end); TupleGuard guard(new_tuple); space_validate_tuple(space, new_tuple); - enum dup_replace_mode mode = dup_replace_mode(request->type); + enum dup_replace_mode mode = dup_replace_mode(request->code); + txn_add_redo(txn, request); txn_replace(txn, space, NULL, new_tuple, mode); } @@ -223,33 +224,31 @@ execute_auth(struct request *request, struct txn * /* txn */, /** }}} */ void -request_check_type(uint32_t type) +request_check_code(uint32_t code) { - if (type < IPROTO_SELECT || type >= IPROTO_DML_REQUEST_MAX) - tnt_raise(LoggedError, ER_UNKNOWN_REQUEST_TYPE, type); + if (code < IPROTO_SELECT || code >= IPROTO_DML_REQUEST_MAX) + tnt_raise(LoggedError, ER_UNKNOWN_REQUEST_TYPE, code); } void -request_create(struct request *request, uint32_t type) +request_create(struct request *request, uint32_t code) { - request_check_type(type); + request_check_code(code); static const request_execute_f execute_map[] = { NULL, execute_select, execute_replace, execute_replace, execute_update, execute_delete, box_lua_call, execute_auth, }; memset(request, 0, sizeof(*request)); - request->type = type; - request->execute = execute_map[type]; + request->execute = execute_map[code]; + request->code = code; } void request_decode(struct request *request, const char *data, uint32_t len) { - assert(request->type != 0); + assert(request->execute != NULL); const char *end = data + len; - request->data = data; - request->len = len; if (mp_typeof(*data) != MP_MAP || mp_check_map(data, end) > 0) { error: @@ -303,16 +302,15 @@ request_decode(struct request *request, const char *data, uint32_t len) #endif } -void -request_encode(struct request *request) +int +request_encode(struct request *request, struct iovec *iov) { - assert(request->data == NULL); + int iovcnt = 1; const int HEADER_LEN_MAX = 32; - uint32_t tuple_len = request->tuple_end - request->tuple; uint32_t key_len = request->key_end - request->key; - uint32_t len = HEADER_LEN_MAX + tuple_len + key_len; - request->data = (char *) region_alloc(&fiber()->gc, len); - char *d = (char *) request->data + 1; /* Skip 1 byte for MP_MAP */ + uint32_t len = HEADER_LEN_MAX + key_len; + char *data = (char *) region_alloc(&fiber()->gc, len); + char *d = (char *) data + 1; /* Skip 1 byte for MP_MAP */ int map_size = 0; if (true) { d = mp_encode_uint(d, IPROTO_SPACE_ID); @@ -332,11 +330,16 @@ request_encode(struct request *request) } if (request->tuple) { d = mp_encode_uint(d, IPROTO_TUPLE); - memcpy(d, request->tuple, tuple_len); - d += tuple_len; + iov[1].iov_base = (void *) request->tuple; + iov[1].iov_len = (request->tuple_end - request->tuple); + iovcnt = 2; map_size++; } - request->len = (d - request->data); - assert(request->len <= len); - mp_encode_map((char *) request->data, map_size); + + assert(d <= data + len); + mp_encode_map(data, map_size); + iov[0].iov_base = data; + iov[0].iov_len = (d - data); + + return iovcnt; } diff --git a/src/box/request.h b/src/box/request.h index e87a46c4c9..5f856beb6d 100644 --- a/src/box/request.h +++ b/src/box/request.h @@ -34,13 +34,13 @@ struct txn; struct port; -typedef void (*request_execute_f)(struct request *, - struct txn *, - struct port *); +typedef void (*request_execute_f)(struct request *, struct txn *, struct port *); +enum { REQUEST_IOVMAX = IPROTO_PACKET_BODY_IOVMAX }; struct request { - uint32_t type; + struct iproto_packet *packet; + uint32_t code; uint32_t space_id; uint32_t index_id; uint32_t offset; @@ -53,18 +53,16 @@ struct request const char *tuple; const char *tuple_end; - const char *data; - uint32_t len; request_execute_f execute; }; void -request_create(struct request *request, uint32_t type); +request_create(struct request *request, uint32_t code); void request_decode(struct request *request, const char *data, uint32_t len); -void -request_encode(struct request *request); +int +request_encode(struct request *request, struct iovec *iov); #endif /* TARANTOOL_BOX_REQUEST_H_INCLUDED */ diff --git a/src/box/txn.cc b/src/box/txn.cc index e227be0da1..22ad5b442d 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -35,13 +35,28 @@ #include <fiber.h> #include "request.h" /* for request_name */ +void +txn_add_redo(struct txn *txn, struct request *request) +{ + if (recovery_state->wal_mode == WAL_NONE) + return; + if (request->packet == NULL) { + /* Generate binary body for Lua requests */ + struct iproto_packet *packet = (struct iproto_packet *) + region_alloc0(&fiber()->gc, sizeof(*packet)); + packet->code = request->code; + packet->bodycnt = request_encode(request, packet->body); + txn->packet = packet; + } else { + txn->packet = request->packet; + } +} + void txn_replace(struct txn *txn, struct space *space, struct tuple *old_tuple, struct tuple *new_tuple, enum dup_replace_mode mode) { - /* txn_add_undo() must be done after txn_add_redo() */ - assert(txn->request->type != 0); assert(old_tuple || new_tuple); /* * Remember the old tuple only if we replaced it @@ -77,24 +92,21 @@ txn_commit(struct txn *txn) { if ((txn->old_tuple || txn->new_tuple) && !space_is_temporary(txn->space)) { - struct request *request = txn->request; + struct iproto_packet *packet = txn->packet; int64_t lsn = next_lsn(recovery_state); int res = 0; if (recovery_state->wal_mode != WAL_NONE) { - /* Generate binary body for Lua requests */ - if (request->data == NULL) - request_encode(request); - + /* txn_commit() must be done after txn_add_redo() */ + assert(txn->packet != NULL); + packet->lsn = lsn; ev_tstamp start = ev_now(loop()), stop; - res = wal_write(recovery_state, lsn, fiber()->cookie, - request->type, request->data, - request->len); + res = wal_write(recovery_state, packet); stop = ev_now(loop()); if (stop - start > cfg.too_long_threshold) { say_warn("too long %s: %.3f sec", - iproto_request_name(request->type), + iproto_request_name(packet->code), stop - start); } } diff --git a/src/box/txn.h b/src/box/txn.h index fb0802d336..b066ebcd13 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -44,15 +44,9 @@ struct txn { struct rlist on_rollback; /* Redo info: binary packet */ - struct request *request; + struct iproto_packet *packet; }; -static inline void -txn_add_redo(struct txn *txn, struct request *request) -{ - txn->request = request; -} - struct txn *txn_begin(); void txn_commit(struct txn *txn); void txn_finish(struct txn *txn); @@ -60,4 +54,5 @@ void txn_rollback(struct txn *txn); void txn_replace(struct txn *txn, struct space *space, struct tuple *old_tuple, struct tuple *new_tuple, enum dup_replace_mode mode); +void txn_add_redo(struct txn *txn, struct request *request); #endif /* TARANTOOL_BOX_TXN_H_INCLUDED */ diff --git a/src/fio.c b/src/fio.c index 3be6160a50..7f1024bd8e 100644 --- a/src/fio.c +++ b/src/fio.c @@ -36,6 +36,7 @@ #include <limits.h> #include <stdio.h> #include <errno.h> +#include <lib/bit/bit.h> #include <say.h> @@ -158,58 +159,78 @@ fio_truncate(int fd, off_t offset) } struct fio_batch * -fio_batch_alloc(long max_iov) +fio_batch_alloc(int max_iov) { struct fio_batch *batch = (struct fio_batch *) malloc(sizeof(struct fio_batch) + - sizeof(struct iovec) * max_iov); + sizeof(struct iovec) * max_iov + + (max_iov / CHAR_BIT + 1)); if (batch == NULL) return NULL; - batch->bytes = batch->rows = batch->max_rows = 0; + batch->bytes = batch->rows = batch->iovcnt = batch->max_rows = 0; batch->max_iov = max_iov; + batch->rowflag = (char *) (batch + 1) + sizeof(struct iovec) * max_iov; return batch; } void fio_batch_start(struct fio_batch *batch, long max_rows) { - batch->bytes = batch->rows = 0; + batch->bytes = batch->rows = batch->iovcnt = 0; batch->max_rows = max_rows; + memset(batch->rowflag, 0, batch->max_iov / CHAR_BIT + 1); } void -fio_batch_add(struct fio_batch *batch, void *row, ssize_t row_len) +fio_batch_add(struct fio_batch *batch, const struct iovec *iov, int iovcnt) { + assert(!fio_batch_has_space(batch, iovcnt)); + assert(iovcnt > 0); assert(batch->max_rows > 0); - assert(! fio_batch_is_full(batch)); - - batch->iov[batch->rows].iov_base = row; - batch->iov[batch->rows].iov_len = row_len; + for (int i = 0; i < iovcnt; i++) { + batch->iov[batch->iovcnt++] = iov[i]; + batch->bytes += iov[i].iov_len; + } + bit_set(batch->rowflag, batch->iovcnt); batch->rows++; - batch->bytes += row_len; } int fio_batch_write(struct fio_batch *batch, int fd) { - ssize_t bytes_written = fio_writev(fd, batch->iov, batch->rows); + ssize_t bytes_written = fio_writev(fd, batch->iov, batch->iovcnt); if (bytes_written <= 0) return 0; if (bytes_written == batch->bytes) - return batch->rows; + return batch->rows; /* returns the number of written rows */ say_warn("fio_batch_write, [%s]: partial write," " wrote %jd out of %jd bytes", fio_filename(fd), (intmax_t) bytes_written, (intmax_t) batch->bytes); - ssize_t good_bytes = 0; + /* Iterate over end of row flags */ + struct bit_iterator bit_it; + bit_iterator_init(&bit_it, batch->rowflag, + batch->max_iov / CHAR_BIT + 1, 1); + size_t row_last_iov = bit_iterator_next(&bit_it); + + int good_rows = 0; /* the number of fully written rows */ + ssize_t good_bytes = 0; /* the number of bytes in fully written rows */ + ssize_t row_bytes = 0; /* the number of bytes in the current row */ struct iovec *iov = batch->iov; - while (iov < batch->iov + batch->rows) { - if (good_bytes + iov->iov_len > bytes_written) + while (iov < batch->iov + batch->iovcnt) { + if (good_bytes + row_bytes + iov->iov_len > bytes_written) break; - good_bytes += iov->iov_len; + row_bytes += iov->iov_len; + if ((iov - batch->iov) == row_last_iov) { + /* the end of current row */ + good_bytes += row_bytes; + row_bytes = 0; + good_rows++; + row_last_iov = bit_iterator_next(&bit_it); + } iov++; } /* @@ -232,5 +253,5 @@ fio_batch_write(struct fio_batch *batch, int fd) */ if (! errno) errno = EAGAIN; - return iov - batch->iov; + return good_rows; /* returns the number of written rows */ } diff --git a/src/fio.h b/src/fio.h index bbece9db8a..957c8273a5 100644 --- a/src/fio.h +++ b/src/fio.h @@ -153,25 +153,32 @@ struct fio_batch ssize_t bytes; /** Total number of batched rows.*/ int rows; + /** Total number of I/O vectors */ + int iovcnt; /** A cap on how many rows can be batched. Can be set to INT_MAX. */ int max_rows; /** A system cap on how many rows can be batched. */ - long max_iov; + int max_iov; + /** + * End of row flags for each iov (bitset). fio_write() tries to + * write {iov, iov, iov with flag} blocks atomically. + */ + char *rowflag; /* Batched rows. */ struct iovec iov[]; }; struct fio_batch * -fio_batch_alloc(long max_iov); +fio_batch_alloc(int max_iov); /** Begin a new batch write. Set a cap on the number of rows in the batch. */ void fio_batch_start(struct fio_batch *batch, long max_rows); static inline bool -fio_batch_is_full(struct fio_batch *batch) +fio_batch_has_space(struct fio_batch *batch, int iovcnt) { - return batch->rows >= batch->max_iov || + return batch->iovcnt + iovcnt > batch->max_iov || batch->rows >= batch->max_rows; } @@ -180,7 +187,7 @@ fio_batch_is_full(struct fio_batch *batch) * @pre fio_batch_is_full() == false */ void -fio_batch_add(struct fio_batch *batch, void *row, ssize_t row_len); +fio_batch_add(struct fio_batch *batch, const struct iovec *iov, int iovcnt); /** * Write all rows stacked into the batch. diff --git a/src/iproto.cc b/src/iproto.cc index 808bbac059..560762dcd8 100644 --- a/src/iproto.cc +++ b/src/iproto.cc @@ -77,9 +77,10 @@ struct iproto_request struct session *session; iproto_request_f process; /* Request message code and sync. */ - uint32_t header[2]; + struct iproto_packet packet; /* Box request, if this is a DML */ struct request request; + size_t total_len; }; struct mempool iproto_request_pool; @@ -383,18 +384,6 @@ iproto_connection_close(struct iproto_connection *con) close(fd); } -static inline void -iproto_validate_header(uint32_t len) -{ - if (len > IPROTO_BODY_LEN_MAX) { - /* - * The package is too big, just close connection for now to - * avoid DoS. - */ - tnt_raise(IllegalParams, "received packet is too big"); - } -} - /** * If there is no space for reading input, we can do one of the * following: @@ -468,73 +457,24 @@ iproto_connection_input_iobuf(struct iproto_connection *con) return newbuf; } - -int64_t -subscribe_request_decode(const char *begin, const char *end) -{ - const char *pos = begin; - if (mp_check(&pos, end)) - goto error; - if (mp_typeof(*begin) != MP_MAP) - goto error; - mp_decode_map(&begin); - /* Key */ - if (mp_typeof(*begin) != MP_UINT) - goto error; - mp_decode_uint(&begin); - /* Value */ - if (mp_typeof(*begin) != MP_UINT) - goto error; - return mp_decode_uint(&begin); -error: - tnt_raise(ClientError, ER_INVALID_MSGPACK, "subscribe request body"); -} - -static inline void -iproto_decode_header(const char **pos, const char *end, uint32_t *keys) -{ - /* Only a small map can be here. */ - if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) { -error: - tnt_raise(ClientError, - ER_INVALID_MSGPACK, "packet header"); - } - uint32_t size = mp_decode_map(pos); - for (int i = 0; i < size; i++) { - - if (! iproto_header_has_key(*pos, end)) { - mp_check(pos, end); - mp_check(pos, end); - continue; - } - - unsigned char key = mp_decode_uint(pos); - - if (mp_typeof(**pos) != MP_UINT || - mp_check_uint(*pos, end) > 0) - goto error; - - keys[key] = mp_decode_uint(pos); - } -} - static void iproto_process_admin(struct iproto_request *ireq, - struct iproto_connection *con, - const char *body, const char *end) + struct iproto_connection *con) { - switch (ireq->header[IPROTO_CODE]) { + switch (ireq->packet.code) { case IPROTO_PING: - iproto_reply_ping(&ireq->iobuf->out, - ireq->header[IPROTO_SYNC]); + iproto_reply_ping(&ireq->iobuf->out, ireq->packet.sync); break; case IPROTO_SUBSCRIBE: - subscribe(con->input.fd, - subscribe_request_decode(body, end)); + if (ireq->packet.bodycnt != 0) { + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "subscribe request body"); + } + subscribe(con->input.fd, ireq->packet.lsn, ireq->packet.sync); tnt_raise(IprotoConnectionShutdown); default: tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t) ireq->header[IPROTO_CODE]); + (uint32_t) ireq->packet.code); } if (! ev_is_active(&con->output)) ev_feed_event(con->loop, &con->output, EV_WRITE); @@ -556,7 +496,8 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) if (mp_check_uint(pos, in->end) >= 0) break; uint32_t len = mp_decode_uint(&pos); - iproto_validate_header(len); + + /* Skip fixheader */ const char *reqend = pos + len; if (reqend > in->end) break; @@ -564,25 +505,32 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) iproto_request_new(con, iproto_process_dml); IprotoRequestGuard guard(ireq); - ireq->header[IPROTO_CODE] = ireq->header[IPROTO_SYNC] = 0; - iproto_decode_header(&pos, reqend, ireq->header); + iproto_packet_decode(&ireq->packet, &pos, reqend); + ireq->total_len = pos - reqstart; /* total request length */ + /* * sic: in case of exception con->parse_size * as well as in->pos must not be advanced, to * stay in sync. */ - if (iproto_request_is_dml(ireq->header[IPROTO_CODE])) { - request_create(&ireq->request, - ireq->header[IPROTO_CODE]); - request_decode(&ireq->request, pos, reqend - pos); + if (iproto_request_is_dml(ireq->packet.code)) { + if (ireq->packet.bodycnt == 0) { + tnt_raise(IllegalParams, + "Invalid MsgPack - invalid request"); + } + request_create(&ireq->request, ireq->packet.code); + pos = (const char *) ireq->packet.body[0].iov_base; + request_decode(&ireq->request, pos, + ireq->packet.body[0].iov_len); + ireq->request.packet = &ireq->packet; iproto_queue_push(&request_queue, guard.release()); - /* Request header can be discarded. */ - in->pos += pos - reqstart; - } else { - iproto_process_admin(ireq, con, pos, reqend); + /* Request will be discarded in iproto_process_dml */ + } else { + iproto_process_admin(ireq, con); /* Entire request can be discarded. */ - in->pos += reqend - reqstart; + in->pos += ireq->packet.body[0].iov_len; } + /* Request is parsed */ con->parse_size -= reqend - reqstart; if (con->parse_size == 0) @@ -723,7 +671,8 @@ iproto_process_dml(struct iproto_request *ireq) struct iproto_connection *con = ireq->connection; auto scope_guard = make_scoped_guard([=]{ - iobuf->in.pos += ireq->request.len; + /* Discard request (see iproto_enqueue_batch()) */ + iobuf->in.pos += ireq->total_len; if (evio_is_active(&con->output)) { if (! ev_is_active(&con->output)) @@ -741,13 +690,13 @@ iproto_process_dml(struct iproto_request *ireq) struct obuf *out = &iobuf->out; struct iproto_port port; - iproto_port_init(&port, out, ireq->header[IPROTO_SYNC]); + iproto_port_init(&port, out, ireq->packet.sync); try { box_process((struct port *) &port, &ireq->request); } catch (ClientError *e) { if (port.found) obuf_rollback_to_svp(out, &port.svp); - iproto_reply_error(out, e, ireq->header[IPROTO_SYNC]); + iproto_reply_error(out, e, ireq->packet.sync); } } @@ -793,8 +742,9 @@ iproto_process_connect(struct iproto_request *request) con->session = session_create(fd, con->cookie); coio_write(&con->input, iproto_greeting(con->session->salt), IPROTO_GREETING_SIZE); + trigger_run(&session_on_connect, NULL); } catch (ClientError *e) { - iproto_reply_error(&iobuf->out, e, request->header[IPROTO_SYNC]); + iproto_reply_error(&iobuf->out, e, request->packet.code); try { iproto_flush(iobuf, fd, &con->write_pos); } catch (Exception *e) { diff --git a/src/iproto_constants.c b/src/iproto_constants.c deleted file mode 100644 index 31ed925aec..0000000000 --- a/src/iproto_constants.c +++ /dev/null @@ -1,68 +0,0 @@ -#include "iproto_constants.h" -#include "msgpuck/msgpuck.h" - -unsigned char iproto_key_type[IPROTO_KEY_MAX] = -{ - /* {{{ header */ - /* 0x00 */ MP_UINT, /* IPROTO_CODE */ - /* 0x01 */ MP_UINT, /* IPROTO_SYNC */ - /* }}} */ - - /* {{{ unused */ - /* 0x02 */ MP_UINT, - /* 0x03 */ MP_UINT, - /* 0x04 */ MP_UINT, - /* 0x05 */ MP_UINT, - /* 0x06 */ MP_UINT, - /* 0x07 */ MP_UINT, - /* 0x08 */ MP_UINT, - /* 0x09 */ MP_UINT, - /* 0x0a */ MP_UINT, - /* 0x0b */ MP_UINT, - /* 0x0c */ MP_UINT, - /* 0x0d */ MP_UINT, - /* 0x0e */ MP_UINT, - /* 0x0f */ MP_UINT, - /* }}} */ - - /* {{{ body -- integer keys */ - /* 0x10 */ MP_UINT, /* IPROTO_SPACE_ID */ - /* 0x11 */ MP_UINT, /* IPROTO_INDEX_ID */ - /* 0x12 */ MP_UINT, /* IPROTO_LIMIT */ - /* 0x13 */ MP_UINT, /* IPROTO_OFFSET */ - /* 0x14 */ MP_UINT, /* IPROTO_ITERATOR */ - /* }}} */ - - /* {{{ unused */ - /* 0x15 */ MP_UINT, - /* 0x16 */ MP_UINT, - /* 0x17 */ MP_UINT, - /* 0x18 */ MP_UINT, - /* 0x19 */ MP_UINT, - /* 0x1a */ MP_UINT, - /* 0x1b */ MP_UINT, - /* 0x1c */ MP_UINT, - /* 0x1d */ MP_UINT, - /* 0x1e */ MP_UINT, - /* 0x1f */ MP_UINT, - /* }}} */ - - /* {{{ body -- all keys */ - /* 0x20 */ MP_ARRAY, /* IPROTO_KEY */ - /* 0x21 */ MP_ARRAY, /* IPROTO_TUPLE */ - /* 0x22 */ MP_STR, /* IPROTO_FUNCTION_NAME */ - /* 0x23 */ MP_STR, /* IPROTO_USER_NAME */ - /* }}} */ -}; - -const char *iproto_request_type_strs[] = -{ - NULL, - "SELECT", - "INSERT", - "REPLACE", - "UPDATE", - "DELETE", - "CALL", - NULL, -}; diff --git a/src/iproto_constants.cc b/src/iproto_constants.cc new file mode 100644 index 0000000000..7aa1d975e4 --- /dev/null +++ b/src/iproto_constants.cc @@ -0,0 +1,193 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "iproto_constants.h" +#include "msgpuck/msgpuck.h" +#include "exception.h" +#include "fiber.h" +#include "crc32.h" + +const unsigned char iproto_key_type[IPROTO_KEY_MAX] = +{ + /* {{{ header */ + /* 0x00 */ MP_UINT, /* IPROTO_CODE */ + /* 0x01 */ MP_UINT, /* IPROTO_SYNC */ + /* 0x02 */ MP_UINT, /* IPROTO_SERVER_ID */ + /* 0x03 */ MP_UINT, /* IPROTO_LSN */ + /* 0x04 */ MP_DOUBLE, /* IPROTO_TIMESTAMP */ + /* }}} */ + + /* {{{ unused */ + /* 0x05 */ MP_UINT, + /* 0x06 */ MP_UINT, + /* 0x07 */ MP_UINT, + /* 0x08 */ MP_UINT, + /* 0x09 */ MP_UINT, + /* 0x0a */ MP_UINT, + /* 0x0b */ MP_UINT, + /* 0x0c */ MP_UINT, + /* 0x0d */ MP_UINT, + /* 0x0e */ MP_UINT, + /* 0x0f */ MP_UINT, + /* }}} */ + + /* {{{ body -- integer keys */ + /* 0x10 */ MP_UINT, /* IPROTO_SPACE_ID */ + /* 0x11 */ MP_UINT, /* IPROTO_INDEX_ID */ + /* 0x12 */ MP_UINT, /* IPROTO_LIMIT */ + /* 0x13 */ MP_UINT, /* IPROTO_OFFSET */ + /* 0x14 */ MP_UINT, /* IPROTO_ITERATOR */ + /* }}} */ + + /* {{{ unused */ + /* 0x15 */ MP_UINT, + /* 0x16 */ MP_UINT, + /* 0x17 */ MP_UINT, + /* 0x18 */ MP_UINT, + /* 0x19 */ MP_UINT, + /* 0x1a */ MP_UINT, + /* 0x1b */ MP_UINT, + /* 0x1c */ MP_UINT, + /* 0x1d */ MP_UINT, + /* 0x1e */ MP_UINT, + /* 0x1f */ MP_UINT, + /* }}} */ + + /* {{{ body -- all keys */ + /* 0x20 */ MP_ARRAY, /* IPROTO_KEY */ + /* 0x21 */ MP_ARRAY, /* IPROTO_TUPLE */ + /* 0x22 */ MP_STR, /* IPROTO_FUNCTION_NAME */ + /* 0x23 */ MP_STR, /* IPROTO_USER_NAME */ + /* }}} */ +}; + +const char *iproto_request_type_strs[] = +{ + NULL, + "SELECT", + "INSERT", + "REPLACE", + "UPDATE", + "DELETE", + "CALL" +}; + +void +iproto_packet_decode(struct iproto_packet *packet, const char **pos, + const char *end) +{ + memset(packet, 0, sizeof(*packet)); + const char *pos2 = *pos; + if (mp_check(&pos2, end) != 0) { +error: + tnt_raise(ClientError, ER_INVALID_MSGPACK, "packet header"); + } + + if (mp_typeof(**pos) != MP_MAP) + goto error; + + uint32_t size = mp_decode_map(pos); + for (uint32_t i = 0; i < size; i++) { + if (mp_typeof(**pos) != MP_UINT) + goto error; + unsigned char key = mp_decode_uint(pos); + if (iproto_key_type[key] != mp_typeof(**pos)) + goto error; + switch (key) { + case IPROTO_CODE: + packet->code = mp_decode_uint(pos); + break; + case IPROTO_SYNC: + packet->sync = mp_decode_uint(pos); + break; + case IPROTO_LSN: + packet->lsn = mp_decode_uint(pos); + break; + case IPROTO_TIMESTAMP: + packet->tm = mp_decode_double(pos); + break; + default: + /* unknown header */ + mp_next(pos); + } + } + + assert(*pos <= end); + if (*pos < end) { + packet->bodycnt = 1; + packet->body[0].iov_base = (void *) *pos; + packet->body[0].iov_len = (end - *pos); + *pos = end; + } +} + +int +iproto_packet_encode(const struct iproto_packet *packet, struct iovec *iov) +{ + enum { HEADER_LEN_MAX = 40 }; + + /* allocate memory for sign + header */ + char *data = (char *) region_alloc(&fiber()->gc, HEADER_LEN_MAX); + + /* Header */ + char *d = data + 1; /* Skip 1 byte for MP_MAP */ + int map_size = 0; + if (true) { + d = mp_encode_uint(d, IPROTO_CODE); + d = mp_encode_uint(d, packet->code); + map_size++; + } + + if (packet->sync) { + d = mp_encode_uint(d, IPROTO_SYNC); + d = mp_encode_uint(d, packet->sync); + map_size++; + } + + if (packet->lsn) { + d = mp_encode_uint(d, IPROTO_LSN); + d = mp_encode_uint(d, packet->lsn); + map_size++; + } + + if (packet->tm) { + d = mp_encode_uint(d, IPROTO_TIMESTAMP); + d = mp_encode_double(d, packet->tm); + map_size++; + } + + assert(d <= data + HEADER_LEN_MAX); + mp_encode_map(data, map_size); + iov->iov_base = data; + iov->iov_len = (d - data); + iov++; + + memcpy(iov, packet->body, sizeof(*iov) * packet->bodycnt); + assert(1 + packet->bodycnt <= IPROTO_PACKET_IOVMAX); + return 1 + packet->bodycnt; /* new iovcnt */ +} diff --git a/src/iproto_constants.h b/src/iproto_constants.h index 63ff74afe4..3309d043b4 100644 --- a/src/iproto_constants.h +++ b/src/iproto_constants.h @@ -30,17 +30,29 @@ */ #include <stdbool.h> #include <stdint.h> +#include <sys/uio.h> /* struct iovec */ +#include <msgpuck/msgpuck.h> + +#if defined(__cplusplus) +extern "C" { +#endif enum { /** Maximal iproto package body length (2GiB) */ IPROTO_BODY_LEN_MAX = 2147483648UL, IPROTO_GREETING_SIZE = 128, + IPROTO_FIXHEADER_SIZE = 5, /* len + (padding) */ + XLOG_FIXHEADER_SIZE = 19 /* marker + len + prev crc32 + cur crc32 + (padding) */ }; enum iproto_key { IPROTO_CODE = 0x00, IPROTO_SYNC = 0x01, + /* replication keys */ + IPROTO_SERVER_ID = 0x02, + IPROTO_LSN = 0x03, + IPROTO_TIMESTAMP = 0x04, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, @@ -60,7 +72,7 @@ enum iproto_key { #define bit(c) (1ULL<<IPROTO_##c) -#define IPROTO_HEAD_BMAP (bit(CODE) | bit(SYNC)) +#define IPROTO_HEAD_BMAP (bit(CODE) | bit(SYNC) | bit(SERVER_ID) | bit(LSN)) #define IPROTO_BODY_BMAP (bit(SPACE_ID) | bit(INDEX_ID) | bit(LIMIT) |\ bit(OFFSET) | bit(KEY) | bit(TUPLE) | \ bit(FUNCTION_NAME) | bit(USER_NAME)) @@ -80,8 +92,7 @@ iproto_body_has_key(const char *pos, const char *end) #undef bit - -extern unsigned char iproto_key_type[IPROTO_KEY_MAX]; +extern const unsigned char iproto_key_type[IPROTO_KEY_MAX]; enum iproto_request_type { IPROTO_SELECT = 1, @@ -118,4 +129,51 @@ iproto_request_is_dml(uint32_t type) return type < IPROTO_DML_REQUEST_MAX; } +enum { + IPROTO_PACKET_HEAD_IOVMAX = 1, + IPROTO_PACKET_BODY_IOVMAX = 2, + IPROTO_PACKET_IOVMAX = IPROTO_PACKET_HEAD_IOVMAX + + IPROTO_PACKET_BODY_IOVMAX +}; + +struct iproto_packet { + uint32_t code; + uint64_t sync; + uint64_t lsn; + double tm; + + int bodycnt; + struct iovec body[IPROTO_PACKET_BODY_IOVMAX]; +}; + +void +iproto_packet_decode(struct iproto_packet *packet, const char **pos, const char *end); +int +iproto_packet_encode(const struct iproto_packet *packet, struct iovec *out); + +struct iproto_subscribe { + uint8_t m_len; /* MP_STR */ + uint32_t v_len; /* length */ + uint8_t m_header; /* MP_MAP */ + uint8_t k_code; /* IPROTO_CODE */ + uint8_t v_code; /* response status */ + uint8_t k_sync; /* IPROTO_SYNC */ + uint8_t m_sync; /* MP_UINT64 */ + uint64_t sync; /* sync */ + uint8_t k_lsn; /* IPROTO_LSN */ + uint8_t m_lsn; /* MP_UINT64 */ + uint64_t lsn; /* lsn */ +} __attribute__((packed)); + +static const struct iproto_subscribe iproto_subscribe_stub = { + 0xce, mp_bswap_u32(sizeof(struct iproto_subscribe) - 5), 0x83, + IPROTO_CODE, IPROTO_SUBSCRIBE, + IPROTO_SYNC, 0xcf, 0, + IPROTO_LSN, 0xcf, 0 +}; + +#if defined(__cplusplus) +} /* extern "C" */ +#endif + #endif /* TARANTOOL_IPROTO_CONSTANTS_H_INCLUDED */ diff --git a/src/iproto_port.cc b/src/iproto_port.cc index f8a3f38339..15bcc8fabb 100644 --- a/src/iproto_port.cc +++ b/src/iproto_port.cc @@ -39,11 +39,11 @@ struct iproto_header_bin { uint32_t v_code; /* response status */ uint8_t k_sync; /* IPROTO_SYNC */ uint8_t m_sync; /* MP_UIN32 */ - uint32_t v_sync; /* sync */ + uint64_t v_sync; /* sync */ } __attribute__((packed)); static const struct iproto_header_bin iproto_header_bin = { - 0xce, 0, 0x82, IPROTO_CODE, 0xce, 0, IPROTO_SYNC, 0xce, 0 + 0xce, 0, 0x82, IPROTO_CODE, 0xce, 0, IPROTO_SYNC, 0xcf, 0 }; struct iproto_body_bin { @@ -62,16 +62,16 @@ static const struct iproto_body_bin iproto_error_bin = { }; void -iproto_reply_ping(struct obuf *out, uint32_t sync) +iproto_reply_ping(struct obuf *out, uint64_t sync) { struct iproto_header_bin reply = iproto_header_bin; reply.v_len = mp_bswap_u32(sizeof(iproto_header_bin) - 5); - reply.v_sync = mp_bswap_u32(sync); + reply.v_sync = mp_bswap_u64(sync); obuf_dup(out, &reply, sizeof(reply)); } void -iproto_reply_error(struct obuf *out, const ClientError *e, uint32_t sync) +iproto_reply_error(struct obuf *out, const ClientError *e, uint64_t sync) { uint32_t msg_len = strlen(e->errmsg()); @@ -81,7 +81,7 @@ iproto_reply_error(struct obuf *out, const ClientError *e, uint32_t sync) uint32_t len = sizeof(header) - 5 + sizeof(body) + msg_len; header.v_len = mp_bswap_u32(len); header.v_code = mp_bswap_u32(tnt_errcode_val(e->errcode())); - header.v_sync = mp_bswap_u32(sync); + header.v_sync = mp_bswap_u64(sync); body.v_data_len = mp_bswap_u32(msg_len); @@ -110,7 +110,7 @@ iproto_port_eof(struct port *ptr) struct iproto_header_bin header = iproto_header_bin; header.v_len = mp_bswap_u32(len); - header.v_sync = mp_bswap_u32(port->sync); + header.v_sync = mp_bswap_u64(port->sync); struct iproto_body_bin body = iproto_body_bin; body.v_data_len = mp_bswap_u32(port->found); diff --git a/src/iproto_port.h b/src/iproto_port.h index 9de89ab863..b1791fecb3 100644 --- a/src/iproto_port.h +++ b/src/iproto_port.h @@ -60,7 +60,7 @@ struct iproto_port /** Output buffer. */ struct obuf *buf; /** Reply header. */ - uint32_t sync; + uint64_t sync; uint32_t found; /** A pointer in the reply buffer where the reply starts. */ struct obuf_svp svp; @@ -70,7 +70,7 @@ extern struct port_vtab iproto_port_vtab; static inline void iproto_port_init(struct iproto_port *port, struct obuf *buf, - uint32_t sync) + uint64_t sync) { port->vtab = &iproto_port_vtab; port->buf = buf; @@ -80,11 +80,10 @@ iproto_port_init(struct iproto_port *port, struct obuf *buf, /** Stack a reply to 'ping' packet. */ void -iproto_reply_ping(struct obuf *out, uint32_t sync); +iproto_reply_ping(struct obuf *out, uint64_t sync); /** Send an error packet back. */ void -iproto_reply_error(struct obuf *out, const ClientError *e, - uint32_t sync); +iproto_reply_error(struct obuf *out, const ClientError *e, uint64_t sync); #endif /* TARANTOOL_IPROTO_PORT_H_INCLUDED */ diff --git a/src/log_io.cc b/src/log_io.cc index e7e699876b..7fff98fd78 100644 --- a/src/log_io.cc +++ b/src/log_io.cc @@ -35,37 +35,22 @@ #include "fio.h" #include "tarantool_eio.h" #include "fiob.h" +#include "msgpuck/msgpuck.h" +#include "iproto_constants.h" const uint32_t xlog_format = 12; -const log_magic_t row_marker = 0xba0babed; -const log_magic_t eof_marker = 0x10adab1e; + +/* + * marker is MsgPack fixext2 + * +--------+--------+--------+--------+ + * | 0xd5 | type | data | + * +--------+--------+--------+--------+ + */ +const log_magic_t row_marker = mp_bswap_u32(0xd5ba0bab); /* host byte order */ +const log_magic_t eof_marker = mp_bswap_u32(0xd510aded); /* host byte order */ const char inprogress_suffix[] = ".inprogress"; const char v12[] = "0.12\n"; -void -log_row_sign(struct log_row *header) -{ - header->data_crc32c = crc32_calc(0, header->data, header->len); - header->header_crc32c = crc32_calc(0, header->header, sizeof(*header) - - offsetof(struct log_row, header)); -} - -void -log_row_fill(struct log_row *row, int64_t lsn, uint64_t cookie, - const char *metadata, size_t metadata_len, const char *data, - size_t data_len) -{ - row->marker = row_marker; - row->tag = WAL; /* unused. */ - row->cookie = cookie; - row->lsn = lsn; - row->tm = ev_now(loop()); - row->len = metadata_len + data_len; - - memcpy(row->data, metadata, metadata_len); - memcpy(row->data + metadata_len, data, data_len); -} - struct log_dir snap_dir = { /* .panic_if_error = */ false, /* .sync_is_async = */ false, @@ -222,39 +207,97 @@ format_filename(struct log_dir *dir, int64_t lsn, enum log_suffix suffix) /* {{{ struct log_io_cursor */ -static struct log_row ROW_EOF; - -static const struct log_row * -row_reader(FILE *f) +static int +row_reader(FILE *f, struct iproto_packet *packet) { - struct log_row m; - - uint32_t header_crc, data_crc; + const char *data; - if (fread(&m.header_crc32c, sizeof(m) - sizeof(log_magic_t), 1, f) != 1) - return &ROW_EOF; + /* Read fixed header */ + char fixheader[XLOG_FIXHEADER_SIZE - sizeof(log_magic_t)]; + if (fread(fixheader, sizeof(fixheader), 1, f) != 1) { + if (feof(f)) + return 1; +error: + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "invalid fixed header"); + } - header_crc = crc32_calc(0, m.header, sizeof(struct log_row) - - offsetof(struct log_row, header)); + /* Decode len, previous crc32 and row crc32 */ + data = fixheader; + if (mp_check(&data, data + sizeof(fixheader)) != 0) + goto error; + data = fixheader; - if (m.header_crc32c != header_crc) { - say_error("header crc32c mismatch"); - return NULL; + /* Read length */ + if (mp_typeof(*data) != MP_UINT) + goto error; + uint32_t len = mp_decode_uint(&data); + if (len > IPROTO_BODY_LEN_MAX) { + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "received packet is too big"); } - char *row = (char *) region_alloc(&fiber()->gc, sizeof(m) + m.len); - memcpy(row, &m, sizeof(m)); - if (fread(row + sizeof(m), m.len, 1, f) != 1) - return &ROW_EOF; + /* Read previous crc32 */ + if (mp_typeof(*data) != MP_UINT) + goto error; + + /* Read current crc32 */ + uint32_t crc32p = mp_decode_uint(&data); + if (mp_typeof(*data) != MP_UINT) + goto error; + uint32_t crc32c = mp_decode_uint(&data); + assert(data <= fixheader + sizeof(fixheader)); + (void) crc32p; + + /* Allocate memory for body */ + char *bodybuf = (char *) region_alloc(&fiber()->gc, len); + + /* Read header and body */ + if (fread(bodybuf, len, 1, f) != 1) + return 1; + + /* Validate checksum */ + if (crc32_calc(0, bodybuf, len) != crc32c) + tnt_raise(ClientError, ER_INVALID_MSGPACK, "invalid crc32"); + + data = bodybuf; + iproto_packet_decode(packet, &data, bodybuf + len); + + return 0; +} - data_crc = crc32_calc(0, row + sizeof(m), m.len); - if (m.data_crc32c != data_crc) { - say_error("data crc32c mismatch"); - return NULL; +int +xlog_encode_row(const struct iproto_packet *packet, struct iovec *iov, + char fixheader[XLOG_FIXHEADER_SIZE]) +{ + int iovcnt = iproto_packet_encode(packet, iov + 1) + 1; + uint32_t len = 0; + uint32_t crc32p = 0; + uint32_t crc32c = 0; + for (int i = 1; i < iovcnt; i++) { + crc32c = crc32_calc(crc32c, (const char *) iov[i].iov_base, + iov[i].iov_len); + len += iov[i].iov_len; } - say_debug("read row v11 success lsn:%lld", (long long) m.lsn); - return (const struct log_row *) row; + char *data = fixheader; + *(log_magic_t *) data = row_marker; + data += sizeof(row_marker); + data = mp_encode_uint(data, len); + /* Encode crc32 for previous row */ + data = mp_encode_uint(data, crc32p); + /* Encode crc32 for current row */ + data = mp_encode_uint(data, crc32c); + /* Encode padding */ + ssize_t padding = XLOG_FIXHEADER_SIZE - (data - fixheader); + if (padding > 0) + data = mp_encode_strl(data, padding - 1) + padding - 1; + assert(data == fixheader + XLOG_FIXHEADER_SIZE); + iov[0].iov_base = fixheader; + iov[0].iov_len = XLOG_FIXHEADER_SIZE; + + assert(iovcnt <= XLOG_ROW_IOVMAX); + return iovcnt; } void @@ -288,11 +331,10 @@ log_io_cursor_close(struct log_io_cursor *i) * @param i iterator object, encapsulating log specifics. * */ -const struct log_row * -log_io_cursor_next(struct log_io_cursor *i) +int +log_io_cursor_next(struct log_io_cursor *i, struct iproto_packet *packet) { struct log_io *l = i->log; - const struct log_row *row; log_magic_t magic; off_t marker_offset = 0; @@ -331,11 +373,10 @@ log_io_cursor_next(struct log_io_cursor *i) (uintmax_t)i->good_offset); say_debug("magic found at 0x%08jx", (uintmax_t)marker_offset); - row = row_reader(l->f); - if (row == &ROW_EOF) - goto eof; - - if (row == NULL) { + try { + if (row_reader(l->f, packet) != 0) + goto eof; + } catch (Exception *e) { if (l->dir->panic_if_error) panic("failed to read row"); say_warn("failed to read row"); @@ -348,7 +389,7 @@ log_io_cursor_next(struct log_io_cursor *i) if (i->row_count % 100000 == 0) say_info("%.1fM rows processed", i->row_count / 1000000.); - return row; + return 0; eof: /* * The only two cases of fully read file: @@ -380,7 +421,7 @@ log_io_cursor_next(struct log_io_cursor *i) } } /* No more rows. */ - return NULL; + return 1; } /* }}} */ diff --git a/src/log_io.h b/src/log_io.h index 0ecc56669a..764d767091 100644 --- a/src/log_io.h +++ b/src/log_io.h @@ -31,8 +31,10 @@ #include <stdio.h> #include <limits.h> #include <stdbool.h> +#include <sys/uio.h> #include "trivia/util.h" #include "tarantool_ev.h" +#include "iproto_constants.h" extern const uint32_t xlog_format; @@ -111,40 +113,15 @@ log_io_cursor_open(struct log_io_cursor *i, struct log_io *l); void log_io_cursor_close(struct log_io_cursor *i); -const struct log_row * -log_io_cursor_next(struct log_io_cursor *i); +int +log_io_cursor_next(struct log_io_cursor *i, struct iproto_packet *packet); +int +xlog_encode_row(const struct iproto_packet *packet, struct iovec *iov, + char fixheader[XLOG_FIXHEADER_SIZE]); +enum { XLOG_ROW_IOVMAX = IPROTO_PACKET_IOVMAX + 1 }; typedef uint32_t log_magic_t; -struct log_row { - log_magic_t marker; - uint32_t header_crc32c; /* calculated for the header block */ - /* {{{ header block */ - char header[0]; /* start of the header */ - int64_t lsn; - double tm; - uint32_t len; - uint16_t tag; - uint64_t cookie; - uint32_t data_crc32c; /* calculated for data */ - /* }}} */ - char data[0]; /* start of the data */ -} __attribute__((packed)); - -void -log_row_sign(struct log_row *row); - -void -log_row_fill(struct log_row *row, int64_t lsn, uint64_t cookie, - const char *metadata, size_t metadata_len, const char *data, - size_t data_len); - -static inline size_t -log_row_size(const struct log_row *row) -{ - return sizeof(struct log_row) + row->len; -} - int inprogress_log_unlink(char *filename); int diff --git a/src/recovery.cc b/src/recovery.cc index 31c0b01f87..0efbc2e730 100644 --- a/src/recovery.cc +++ b/src/recovery.cc @@ -40,6 +40,9 @@ #include "replica.h" #include "fiber.h" +#include "msgpuck/msgpuck.h" +#include "iproto_constants.h" +#include "crc32.h" /* * Recovery subsystem @@ -207,7 +210,7 @@ recovery_stop_local(struct recovery_state *r); void recovery_init(const char *snap_dirname, const char *wal_dirname, row_handler row_handler, void *row_handler_param, - int rows_per_wal) + snapshot_handler snapshot_handler, int rows_per_wal) { assert(recovery_state == NULL); recovery_state = (struct recovery_state *) calloc(1, sizeof(struct recovery_state)); @@ -219,6 +222,8 @@ recovery_init(const char *snap_dirname, const char *wal_dirname, r->row_handler = row_handler; r->row_handler_param = row_handler_param; + r->snapshot_handler = snapshot_handler; + r->snap_dir = &snap_dir; r->snap_dir->dirname = strdup(snap_dirname); r->wal_dir = &wal_dir; @@ -291,7 +296,7 @@ recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_e * @return panics on error * Errors are logged to the log file. */ -static void +void init_storage_on_master(struct log_dir *dir) { const char *filename = format_filename(dir, 1 /* lsn */, NONE); @@ -307,47 +312,13 @@ init_storage_on_master(struct log_dir *dir) filename); } close(fd); -} - -/** Download the latest snapshot from master. */ -static void -init_storage_on_replica(struct log_dir *dir, const char *replication_source) -{ - say_info("downloading snapshot from master %s...", - replication_source); - - int master = replica_bootstrap(replication_source); - FDGuard guard_master(master); - - struct { - uint64_t lsn; - uint64_t file_size; - } response; - sio_readn(master, &response, sizeof(response)); - - const char *filename = format_filename(dir, response.lsn, NONE); - say_info("saving snapshot `%s'", filename); - int fd = open(filename, O_WRONLY|O_CREAT|O_EXCL, dir->mode); - if (fd == -1) { - panic_syserror("failed to open snapshot file `%s' for " - "writing", filename); - } - FDGuard guard_fd(fd); - - sio_recvfile(master, fd, NULL, response.file_size); -} - -/** Create the initial snapshot file in the snap directory. */ -void -init_storage(struct log_dir *dir, const char *replication_source) -{ - if (replication_source) - init_storage_on_replica(dir, replication_source); - else - init_storage_on_master(dir); say_info("done"); } +/** + * Read a snapshot and call row_handler for every snapshot row. + * Panic in case of error. + */ /** * Read a snapshot and call row_handler for every snapshot row. * Panic in case of error. @@ -361,12 +332,21 @@ recover_snap(struct recovery_state *r, const char *replication_source) struct log_io *snap; int64_t lsn; + int rc = 0; lsn = greatest_lsn(r->snap_dir); if (lsn == 0 && greatest_lsn(r->wal_dir) == 0) { say_info("found an empty data directory, initializing..."); - init_storage(r->snap_dir, replication_source); - lsn = greatest_lsn(r->snap_dir); + if (replication_source) { + /* play rows and save snapshot */ + replica_bootstrap(r, replication_source); + snapshot_save(r); + assert(r->lsn == greatest_lsn(r->snap_dir)); + return; + } else { + init_storage_on_master(r->snap_dir); + lsn = greatest_lsn(r->snap_dir); + } } if (lsn <= 0) { @@ -383,18 +363,19 @@ recover_snap(struct recovery_state *r, const char *replication_source) log_io_cursor_open(&i, snap); - const struct log_row *row; - while ((row = log_io_cursor_next(&i))) { - if (r->row_handler(r->row_handler_param, row) < 0) { + struct iproto_packet packet; + while (log_io_cursor_next(&i, &packet) == 0) { + if (r->row_handler(r->row_handler_param, &packet) < 0) { say_error("can't apply row"); if (snap->dir->panic_if_error) break; + rc = 1; } } log_io_cursor_close(&i); log_io_close(&snap); - if (row == NULL) { + if (rc == 0) { r->lsn = r->confirmed_lsn = lsn; say_info("snapshot recovered, confirmed lsn: %" PRIi64, r->confirmed_lsn); @@ -415,7 +396,7 @@ recover_snap(struct recovery_state *r, const char *replication_source) * @retval 0 EOF * @retval 1 ok, maybe read something */ -static int +int recover_wal(struct recovery_state *r, struct log_io *l) { int res = -1; @@ -423,9 +404,9 @@ recover_wal(struct recovery_state *r, struct log_io *l) log_io_cursor_open(&i, l); - const struct log_row *row; - while ((row = log_io_cursor_next(&i))) { - if (row->lsn <= r->confirmed_lsn) { + struct iproto_packet packet; + while (log_io_cursor_next(&i, &packet) == 0) { + if (packet.lsn <= r->confirmed_lsn) { say_debug("skipping too young row"); continue; } @@ -433,12 +414,12 @@ recover_wal(struct recovery_state *r, struct log_io *l) * After handler(row) returned, row may be * modified, do not use it. */ - if (r->row_handler(r->row_handler_param, row) < 0) { + if (r->row_handler(r->row_handler_param, &packet) < 0) { say_error("can't apply row"); if (l->dir->panic_if_error) goto end; } - set_lsn(r, row->lsn); + set_lsn(r, packet.lsn); } res = i.eof_read ? LOG_EOF : 1; end: @@ -773,7 +754,8 @@ struct wal_write_request { /* Auxiliary. */ int res; struct fiber *fiber; - struct log_row row; + struct iproto_packet *packet; + char wal_fixheader[XLOG_FIXHEADER_SIZE]; }; /* Context of the WAL writer thread. */ @@ -1102,10 +1084,11 @@ wal_fill_batch(struct log_io *wal, struct fio_batch *batch, int rows_per_wal, /* Post-condition of successful wal_opt_rotate(). */ assert(max_rows > 0); fio_batch_start(batch, max_rows); - while (req != NULL && ! fio_batch_is_full(batch)) { - struct log_row *row = &req->row; - log_row_sign(row); - fio_batch_add(batch, row, log_row_size(row)); + + struct iovec iov[XLOG_ROW_IOVMAX]; + while (req != NULL && !fio_batch_has_space(batch, nelem(iov))) { + int iovcnt = xlog_encode_row(req->packet, iov, req->wal_fixheader); + fio_batch_add(batch, iov, iovcnt); req = STAILQ_NEXT(req, wal_fifo_entry); } return req; @@ -1137,7 +1120,7 @@ wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer, while (req) { if (wal_opt_rotate(wal, r->rows_per_wal, r->wal_dir, - req->row.lsn) != 0) + req->packet->lsn) != 0) break; struct wal_write_request *batch_end; batch_end = wal_fill_batch(*wal, batch, r->rows_per_wal, req); @@ -1194,23 +1177,22 @@ wal_writer_thread(void *worker_args) * to be written to disk and wait until this task is completed. */ int -wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie, - uint16_t op, const char *row, uint32_t row_len) +wal_write(struct recovery_state *r, struct iproto_packet *packet) { assert(r->wal_mode != WAL_NONE); - say_debug("wal_write lsn=%" PRIi64, lsn); + say_debug("wal_write lsn=%" PRIi64, packet->lsn); ERROR_INJECT_RETURN(ERRINJ_WAL_IO); struct wal_writer *writer = r->writer; struct wal_write_request *req = (struct wal_write_request *) - region_alloc(&fiber()->gc, sizeof(struct wal_write_request) + - sizeof(op) + row_len); + region_alloc(&fiber()->gc, sizeof(struct wal_write_request)); req->fiber = fiber(); req->res = -1; - log_row_fill(&req->row, lsn, cookie, (const char *) &op, - sizeof(op), row, row_len); + req->packet = packet; + packet->tm = ev_now(loop()); + packet->sync = 0; (void) tt_pthread_mutex_lock(&writer->mutex); @@ -1232,9 +1214,7 @@ wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie, /* {{{ box.snapshot() */ void -snapshot_write_row(struct log_io *l, - const char *metadata, size_t metadata_len, - const char *data, size_t data_len) +snapshot_write_row(struct log_io *l, struct iproto_packet *packet) { static int rows; static uint64_t bytes; @@ -1242,24 +1222,26 @@ snapshot_write_row(struct log_io *l, static ev_tstamp last = 0; ev_loop *loop = loop(); - struct log_row *row = (struct log_row *) region_alloc(&fiber()->gc, - sizeof(*row) + data_len + metadata_len); - - log_row_fill(row, ++rows, snapshot_cookie, metadata, - metadata_len, data, data_len); - log_row_sign(row); - - if (rows % 100000 == 0) - say_crit("%.1fM rows written", rows / 1000000.); + packet->tm = last; + packet->lsn = ++rows; + packet->sync = 0; /* don't write sync to wal */ - size_t written = fwrite(row, 1, log_row_size(row), l->f); + char fixheader[XLOG_FIXHEADER_SIZE]; + struct iovec iov[XLOG_ROW_IOVMAX]; + int iovcnt = xlog_encode_row(packet, iov, fixheader); - if (written != log_row_size(row)) { - say_error("Can't write row (%zu bytes)", log_row_size(row)); - panic_syserror("snapshot_write_row"); + /* TODO: use writev here */ + for (int i = 0; i < iovcnt; i++) { + if (fwrite(iov[i].iov_base, iov[i].iov_len, 1, l->f) != 1) { + say_error("Can't write row (%zu bytes)", + iov[i].iov_len); + panic_syserror("snapshot_write_row"); + } + bytes += iov[i].iov_len; } - bytes += written; + if (rows % 100000 == 0) + say_crit("%.1fM rows written", rows / 1000000.); region_free_after(&fiber()->gc, 128 * 1024); @@ -1302,8 +1284,9 @@ snapshot_write_row(struct log_io *l, } void -snapshot_save(struct recovery_state *r, void (*f) (struct log_io *)) +snapshot_save(struct recovery_state *r) { + assert(r->snapshot_handler != NULL); struct log_io *snap; snap = log_io_open_for_write(r->snap_dir, r->confirmed_lsn, INPROGRESS); @@ -1317,8 +1300,8 @@ snapshot_save(struct recovery_state *r, void (*f) (struct log_io *)) say_info("saving snapshot `%s'", format_filename(r->snap_dir, r->confirmed_lsn, NONE)); - if (f) - f(snap); + + r->snapshot_handler(snap); log_io_close(&snap); diff --git a/src/recovery.h b/src/recovery.h index 03b31cdfad..2d1eac753a 100644 --- a/src/recovery.h +++ b/src/recovery.h @@ -32,6 +32,7 @@ #include "trivia/util.h" #include "tarantool_ev.h" +#include "log_io.h" #if defined(__cplusplus) extern "C" { @@ -40,7 +41,8 @@ extern "C" { struct fiber; struct tbuf; -typedef int (row_handler)(void *, const struct log_row *row); +typedef int (row_handler)(void *, struct iproto_packet *packet); +typedef void (snapshot_handler)(struct log_io *); /** A "condition variable" that allows fibers to wait when a given * LSN makes it to disk. @@ -88,6 +90,7 @@ struct recovery_state { */ row_handler *row_handler; void *row_handler_param; + snapshot_handler *snapshot_handler; uint64_t snap_io_rate_limit; int rows_per_wal; double wal_fsync_delay; @@ -101,18 +104,20 @@ extern struct recovery_state *recovery_state; void recovery_init(const char *snap_dirname, const char *xlog_dirname, row_handler row_handler, void *row_handler_param, - int rows_per_wal); + snapshot_handler snapshot_handler, int rows_per_wal); void recovery_update_mode(struct recovery_state *r, const char *wal_mode, double fsync_delay); void recovery_update_io_rate_limit(struct recovery_state *r, double new_limit); void recovery_free(); -void recover_snap(struct recovery_state *, const char *replication_source); +void recover_snap(struct recovery_state *r, const char *replication_source); void recover_existing_wals(struct recovery_state *); void recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay); void recovery_finalize(struct recovery_state *r); -int wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie, - uint16_t op, const char *data, uint32_t len); + +int +recover_wal(struct recovery_state *r, struct log_io *l); /* for replication */ +int wal_write(struct recovery_state *r, struct iproto_packet *packet); void recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_error); @@ -124,13 +129,12 @@ void recovery_wait_lsn(struct recovery_state *r, int64_t lsn); struct fio_batch; -void snapshot_write_row(struct log_io *i, - const char *metadata, size_t metadata_size, - const char *data, size_t data_size); -void snapshot_save(struct recovery_state *r, void (*loop) (struct log_io *)); +void +snapshot_write_row(struct log_io *l, struct iproto_packet *packet); +void snapshot_save(struct recovery_state *r); void -init_storage(struct log_dir *dir, const char *replication_source); +init_storage_on_master(struct log_dir *dir); #if defined(__cplusplus) } /* extern "C" */ diff --git a/src/replica.cc b/src/replica.cc index 18ab56e1a0..66b20dfee5 100644 --- a/src/replica.cc +++ b/src/replica.cc @@ -39,55 +39,86 @@ #include "coio_buf.h" #include "recovery.h" #include "tarantool.h" +#include "iproto.h" #include "iproto_constants.h" #include "msgpuck/msgpuck.h" #include "replica.h" static void -remote_apply_row(struct recovery_state *r, const struct log_row *row); +remote_apply_row(struct recovery_state *r, struct iproto_packet *packet); -static const struct log_row * -remote_read_row(struct ev_io *coio, struct iobuf *iobuf) +static void +remote_remote_read_row_fd(struct ev_io *coio, struct iobuf *iobuf, + struct iproto_packet *packet) { struct ibuf *in = &iobuf->in; - ssize_t to_read = sizeof(struct log_row) - ibuf_size(in); - if (to_read > 0) { - ibuf_reserve(in, cfg_readahead); - coio_breadn(coio, in, to_read); + /* Read fixed header */ + if (ibuf_size(in) < IPROTO_FIXHEADER_SIZE) + coio_breadn(coio, in, IPROTO_FIXHEADER_SIZE - ibuf_size(in)); + + /* Read length */ + if (mp_typeof(*in->pos) != MP_UINT) { + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "invalid fixed header"); } - ssize_t request_len = ((const struct log_row *) in->pos)->len + - sizeof(struct log_row); - to_read = request_len - ibuf_size(in); + const char *data = in->pos; + uint32_t len = mp_decode_uint(&data); + if (len > IPROTO_BODY_LEN_MAX) { + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "received packet is too big"); + } + in->pos += IPROTO_FIXHEADER_SIZE; + /* Read header and body */ + ssize_t to_read = len - ibuf_size(in); if (to_read > 0) coio_breadn(coio, in, to_read); - const struct log_row *row = (const struct log_row *) in->pos; - in->pos += request_len; - return row; + iproto_packet_decode(packet, (const char **) &in->pos, in->pos + len); } -struct iproto_subscribe_request { - uint8_t v_len; /* length */ - uint8_t m_header; /* MP_MAP */ - uint8_t k_code; /* IPROTO_CODE */ - uint8_t v_code; /* response status */ - uint8_t m_body; /* MP_MAP */ - uint8_t k_data; /* IPROTO_OFFSET */ - uint8_t m_data; /* MP_UINT64 */ - uint64_t lsn; /* lsn */ -} __attribute__((packed)); - -static const struct iproto_subscribe_request iproto_subscribe_request = { - sizeof(struct iproto_subscribe_request) - 1, - 0x81, IPROTO_CODE, IPROTO_SUBSCRIBE, - 0x81, IPROTO_OFFSET, 0xcf, 0 -}; - -int -replica_bootstrap(const char *replication_source) +/* Blocked I/O */ +static void +remote_read_row_fd(int sock, struct iproto_packet *packet) +{ + const char *data; + + /* Read fixed header */ + char fixheader[IPROTO_FIXHEADER_SIZE]; + if (sio_read(sock, fixheader, sizeof(fixheader)) != sizeof(fixheader)) { +error: + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "invalid fixed header"); + } + data = fixheader; + if (mp_check(&data, data + sizeof(fixheader)) != 0) + goto error; + data = fixheader; + + /* Read length */ + if (mp_typeof(*data) != MP_UINT) + goto error; + uint32_t len = mp_decode_uint(&data); + if (len > IPROTO_BODY_LEN_MAX) { + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "received packet is too big"); + } + + /* Read header and body */ + char *bodybuf = (char *) region_alloc(&fiber()->gc, len); + if (sio_read(sock, bodybuf, len) != len) { + tnt_raise(ClientError, ER_INVALID_MSGPACK, + "invalid row - can't read"); + } + + data = bodybuf; + iproto_packet_decode(packet, &data, data + len); +} + +void +replica_bootstrap(struct recovery_state *r, const char *replication_source) { char ip_addr[32]; char greeting[IPROTO_GREETING_SIZE]; @@ -109,13 +140,36 @@ replica_bootstrap(const char *replication_source) int master = sio_socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); FDGuard guard(master); + + assert(r->confirmed_lsn == 0 && r->lsn == 0); + uint64_t sync = rand(); + + /* Send JOIN request */ + struct iproto_subscribe subscribe = iproto_subscribe_stub; + subscribe.sync = mp_bswap_u64(sync); sio_connect(master, &addr, sizeof(addr)); sio_readn(master, greeting, sizeof(greeting)); - sio_writen(master, &iproto_subscribe_request, - sizeof(iproto_subscribe_request)); + sio_write(master, &subscribe, sizeof(subscribe)); + + while (true) { + struct iproto_packet packet; + + remote_read_row_fd(master, &packet); + if (packet.sync != sync) + tnt_raise(IllegalParams, "unexpected packet"); + + /* Recv JOIN response (= end of stream) */ + if (packet.code == IPROTO_SUBSCRIBE) { + if (packet.bodycnt != 0) + tnt_raise(IllegalParams, "subscribe response body"); + set_lsn(r, packet.lsn); + say_info("done"); + break; + } - guard.fd = -1; - return master; + remote_apply_row(r, &packet); + } + /* master socket closed by guard */ } static void @@ -129,7 +183,8 @@ remote_connect(struct ev_io *coio, struct sockaddr_in *remote_addr, coio_connect(coio, remote_addr); coio_readn(coio, greeting, sizeof(greeting)); - struct iproto_subscribe_request request = iproto_subscribe_request; + /* Send JOIN request */ + struct iproto_subscribe request = iproto_subscribe_stub; request.lsn = mp_bswap_u64(initial_lsn); coio_write(coio, &request, sizeof(request)); @@ -165,15 +220,16 @@ pull_from_remote(va_list ap) "connected"); } err = "can't read row"; - const struct log_row *row = remote_read_row(&coio, iobuf); + struct iproto_packet packet; + remote_remote_read_row_fd(&coio, iobuf, &packet); fiber_setcancellable(false); err = NULL; - r->remote->recovery_lag = ev_now(loop) - row->tm; + r->remote->recovery_lag = ev_now(loop) - packet.tm; r->remote->recovery_last_update_tstamp = ev_now(loop); - remote_apply_row(r, row); + remote_apply_row(r, &packet); iobuf_gc(iobuf); fiber_gc(); @@ -213,14 +269,12 @@ pull_from_remote(va_list ap) } static void -remote_apply_row(struct recovery_state *r, const struct log_row *row) +remote_apply_row(struct recovery_state *r, struct iproto_packet *packet) { - assert(row->tag == WAL); - - if (r->row_handler(r->row_handler_param, row) < 0) + if (r->row_handler(r->row_handler_param, packet) < 0) panic("replication failure: can't apply row"); - set_lsn(r, row->lsn); + set_lsn(r, packet->lsn); } void diff --git a/src/replica.h b/src/replica.h index 0820f7dedd..553a4dde01 100644 --- a/src/replica.h +++ b/src/replica.h @@ -48,8 +48,8 @@ struct remote { * @return A connected socket, ready too receive * data. */ -int -replica_bootstrap(const char *replication_source); +void +replica_bootstrap(struct recovery_state *r, const char *replication_source); void recovery_follow_remote(struct recovery_state *r, const char *addr); diff --git a/src/replication.cc b/src/replication.cc index 0a2eb143e1..eca1dd6452 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -49,6 +49,8 @@ extern "C" { #include "recovery.h" #include "log_io.h" #include "evio.h" +#include "iproto_constants.h" +#include "msgpuck/msgpuck.h" /** Replication topology * ---------------------- @@ -86,6 +88,7 @@ struct replica { int sock; /** Initial lsn. */ int64_t lsn; + uint64_t sync; } replica; /** Send a file descriptor to replication relay spawner. @@ -194,11 +197,12 @@ struct subscribe_request { struct ev_io io; int fd; int64_t lsn; + uint64_t sync; }; /** Replication acceptor fiber handler. */ void -subscribe(int fd, int64_t lsn) +subscribe(int fd, int64_t lsn, uint64_t sync) { struct subscribe_request *request = (struct subscribe_request *) malloc(sizeof(struct subscribe_request)); @@ -209,6 +213,7 @@ subscribe(int fd, int64_t lsn) request->fd = fd; request->io.data = request; request->lsn = lsn; + request->sync = sync; ev_io_init(&request->io, replication_send_socket, master_to_spawner_socket, EV_WRITE); ev_io_start(loop(), &request->io); @@ -227,7 +232,7 @@ replication_send_socket(ev_loop *loop, ev_io *watcher, int /* events */) struct cmsghdr *control_message = NULL; iov.iov_base = &request->lsn; - iov.iov_len = sizeof(request->lsn); + iov.iov_len = sizeof(request->lsn) + sizeof(request->sync); memset(&msg, 0, sizeof(msg)); @@ -342,7 +347,7 @@ spawner_main_loop() char control_buf[CMSG_SPACE(sizeof(int))]; iov.iov_base = &replica.lsn; - iov.iov_len = sizeof(replica.lsn); + iov.iov_len = sizeof(replica.lsn) + sizeof(replica.sync); msg.msg_name = NULL; msg.msg_namelen = 0; @@ -537,56 +542,102 @@ replication_relay_recv(ev_loop * /* loop */, struct ev_io *w, int __attribute__( exit(EXIT_FAILURE); } +/* Only for blocked I/O */ +static inline ssize_t +sio_writev_all(int fd, struct iovec *iov, int iovcnt) +{ + ssize_t bytes_total = 0; + struct iovec *iovend = iov + iovcnt; + while(1) { + ssize_t bytes_written = sio_writev(fd, iov, iovend - iov); + bytes_total += bytes_written; + while (bytes_written >= iov->iov_len) + bytes_written -= (iov++)->iov_len; + if (iov == iovend) + break; + iov->iov_base = (char *) iov->iov_base + bytes_written; + iov->iov_len -= bytes_written; + } + + return bytes_total; +} + + +enum { IPROTO_ROW_IOVMAX = IPROTO_PACKET_IOVMAX + 1 }; + +static int +iproto_encode_row(const struct iproto_packet *packet, struct iovec *iov, + char fixheader[IPROTO_FIXHEADER_SIZE]) +{ + int iovcnt = iproto_packet_encode(packet, iov + 1) + 1; + uint32_t len = 0; + for (int i = 1; i < iovcnt; i++) + len += iov[i].iov_len; + + /* Encode length */ + char *data = fixheader; + data = mp_encode_uint(data, len); + /* Encode padding */ + ssize_t padding = IPROTO_FIXHEADER_SIZE - (data - fixheader); + if (padding > 0) + data = mp_encode_strl(data, padding - 1) + padding - 1; + assert(data == fixheader + IPROTO_FIXHEADER_SIZE); + iov[0].iov_base = fixheader; + iov[0].iov_len = IPROTO_FIXHEADER_SIZE; + + assert(iovcnt <= IPROTO_ROW_IOVMAX); + return iovcnt; +} /** Send a single row to the client. */ static int -replication_relay_send_row(void * /* param */, const struct log_row *row) +replication_relay_send_row(void * /* param */, struct iproto_packet *packet) { - ssize_t bytes, len = log_row_size(row); - while (len > 0) { - bytes = write(replica.sock, row, len); - if (bytes < 0) { - if (errno == EPIPE) { - /* socket closed on opposite site */ - goto shutdown_handler; - } - panic_syserror("write"); - } - len -= bytes; - row += bytes; + try { + packet->sync = replica.sync; + /* Encode length */ + struct iovec iov[IPROTO_ROW_IOVMAX]; + char fixheader[IPROTO_FIXHEADER_SIZE]; + int iovcnt = iproto_encode_row(packet, iov, fixheader); + sio_writev_all(replica.sock, iov, iovcnt); + } catch(SocketError *e) { + say_info("the client has closed its replication socket, exiting"); + exit(EXIT_SUCCESS); } return 0; -shutdown_handler: - say_info("the client has closed its replication socket, exiting"); - exit(EXIT_SUCCESS); } static void -replication_relay_send_snapshot() +replication_relay_join(struct recovery_state *r, uint64_t sync) { FDGuard guard_replica(replica.sock); - struct log_dir dir = snap_dir; - dir.dirname = cfg.snap_dir; - int64_t lsn = greatest_lsn(&dir); - const char *filename = format_filename(&dir, lsn, NONE); - int snapshot = open(filename, O_RDONLY); - if (snapshot < 0) - panic_syserror("can't find/open snapshot"); - - FDGuard guard_snapshot(snapshot); - - struct stat st; - if (fstat(snapshot, &st) != 0) - panic_syserror("fstat"); - - uint64_t header[2]; - header[0] = lsn; - header[1] = st.st_size; - sio_writen(replica.sock, header, sizeof(header)); - sio_sendfile(replica.sock, snapshot, NULL, header[1]); + int64_t lsn = greatest_lsn(r->snap_dir); + if (lsn <= 0) + panic("can't find snapshot"); + + struct log_io *snap = log_io_open_for_read(r->snap_dir, lsn, NONE); + if (snap == NULL) + panic("can't open snapshot"); + say_info("sending snapshot `%s'", snap->filename); + + /* Send rows */ + int rc = recover_wal(r, snap); + log_io_close(&snap); + + if (rc != 0) + panic("can't sent snapshot"); + + /* Send response to JOIN command = end of stream */ + struct iproto_subscribe response = iproto_subscribe_stub; + response.lsn = mp_bswap_u64(lsn); + response.sync = mp_bswap_u64(sync); + sio_write(replica.sock, &response, sizeof(response)); + + say_info("snapshot sent, lsn: %" PRIi64, lsn); exit(EXIT_SUCCESS); + /* replica.sock closed by guard */ } /** The main loop of replication client service process. */ @@ -633,8 +684,6 @@ replication_relay_loop() say_syserror("sigaction"); } - if (replica.lsn == 0) - replication_relay_send_snapshot(); /* exits */ /* * Init a read event: when replica closes its end * of the socket, we can read EOF and shutdown the @@ -649,11 +698,16 @@ replication_relay_loop() /* Initialize the recovery process */ recovery_init(cfg.snap_dir, cfg.wal_dir, replication_relay_send_row, - NULL, INT32_MAX); + NULL, NULL, INT32_MAX); /* * Note that recovery starts with lsn _NEXT_ to * the confirmed one. */ + if (replica.lsn == 0) { + recovery_state->lsn = recovery_state->confirmed_lsn = 0; + replication_relay_join(recovery_state, replica.sync); /* exits */ + } + recovery_state->lsn = recovery_state->confirmed_lsn = replica.lsn - 1; recover_existing_wals(recovery_state); /* Found nothing. */ diff --git a/src/replication.h b/src/replication.h index 40ecc01adf..b42c15c36a 100644 --- a/src/replication.h +++ b/src/replication.h @@ -45,7 +45,7 @@ replication_prefork(); * @return None. On error, closes the socket. */ void -subscribe(int fd, int64_t lsn); +subscribe(int fd, int64_t lsn, uint64_t sync); #endif // TARANTOOL_REPLICATION_H_INCLUDED diff --git a/src/session.cc b/src/session.cc index 8a787fdd72..1ddc10a841 100644 --- a/src/session.cc +++ b/src/session.cc @@ -83,14 +83,7 @@ session_create(int fd, uint64_t cookie) * fiber sid. */ fiber_set_session(fiber(), session); - try { - trigger_run(&session_on_connect, NULL); - } catch (Exception *e) { - fiber_set_session(fiber(), NULL); - mh_i32ptr_remove(session_registry, &node, NULL); - mempool_free(&session_pool, session); - throw; - } + /* Set session user to guest, until it is authenticated. */ session_set_user(session, GUEST, GUEST); return session; diff --git a/test/box/bad_trigger.result b/test/box/bad_trigger.result index b40377d171..a7421fe612 100644 --- a/test/box/bad_trigger.result +++ b/test/box/bad_trigger.result @@ -10,7 +10,12 @@ box.session.on_connect(f1) --- ... select * from t0 where k0=0 -Connection is dead: Connection reset by peer. +--- +- error: + errcode: ER_PROC_LUA + errmsg: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value) +... +Connection is alive. box.session.on_connect(nil, f1) --- diff --git a/test/box/dup_key1.xlog b/test/box/dup_key1.xlog index 08563a9fb2cb9836509d28764624e13ae2c2d2dd..2821f7db0b489ca138f7ce3d8869f384c1c96471 100644 GIT binary patch literal 247 zcma#>@ptDk&@(jR;<~zvd$l^lxv;=9=avURKr;gqGZV{c$17*z+&bIEn*`1>N+?b{ z%f!IAq$IVtWNB_{Zb=2h0+3EUhI1D`2W>#s$qdtZS_q_5L2(99C&TiBqRiaHqRJ&D zMX9Nbj0}tON^?O5C^MXEeH_w(Yyb<)fV*u#1DF^TCo!&0%PcA`Q79=b$Vmn1QeilE ff5!O=WL>N<UGLUFbuq0ePEF3wOMz+=So;<LyQ5!; literal 281 zcma#>@ptDk&@(jR;(EK9d)G&ki{F?SAfRlsdcf}ER~$uv+<*U}f(-XB3z{=AGzpw# zlu(>{mWhFJNl9vP3Bv-AZjJoE$;?oNO?O`gf^;jQ=-zbG?F&S=g5nIIZieLrMVYyY zMU_iRic(V<85tJmmF9ws3bGXX$^tcN*@le;Afv=kjCv#U;W*eRCI-bxjH}Z!i;7DW xN=gfIQbGFryH_W$LiN8fy;Ka+FOH(0ElKA&TtCyA;?(5)ycCE5a;w)000180T0H;& diff --git a/test/box/dup_key2.xlog b/test/box/dup_key2.xlog index fb10436343275d0d4f7b12af4a3746fa56f0c55b..052bfac1cd3392ee0609a962720aaeebf04eb2ef 100644 GIT binary patch literal 124 zcma#>@ptDk&@(jR;<~zvd$lsdx%7~`n#%(qpqYV*nT_SN<CQaUVc~C9Gzpw#Vo;pK zxVj`GvnWNOq_iL>6{Jgr;hd0XI}@@lc8IQsz76?MT}*4z@=J?KGN9T7*1iP*lMF4P literal 144 zcma#>@ptDk&@(jR;(EK9dspexKzTL>2x#rmUc3AF6-O~3_uqf0Aj9UBoz+YXO#)|` z7!)Tlt}e;QEJ{%*DJ{rJ1?iu1pK}^JRH>NTl64^c;wbuGM`gc&>t|Y%mS0*_k^wP5 IZuMFL06E$$^Z)<= diff --git a/test/box/unfinished.xlog b/test/box/unfinished.xlog index e689a9c7aedf8aa1874da05932643372abb40cec..5ebfa99f87438a2dbf7f67843b2b6e22f7a88e68 100644 GIT binary patch literal 122 zcma#>@ptDk&@(jR;<~zvd$l6NxxURpipv8apqYW0nU&?V<CQZ^cked7ZW3TnoW!!) zEx)KFL!qR!ASV^1$B^ON|Fe%pk@c`a^gNi?a1X49b$4i9T4r8maYky2LQ!gReo+b+ E0Hjwf_W%F@ literal 79 zcma#>@ptDk&@(jR;(EK9d)K!ee%n|XAfR<lbGYHoD~?h??!W&~L54Z{%8E=3O^TCP ZH-_e=W#(lTXQZYm6s0ET7o`X=0050S7WV)E -- GitLab