diff --git a/src/00000000000000000000.snap b/src/00000000000000000000.snap deleted file mode 100644 index 1f5eaddfb3cb1d59c61291a77e2264b4d6b07b6e..0000000000000000000000000000000000000000 Binary files a/src/00000000000000000000.snap and /dev/null differ diff --git a/src/box/alter.cc b/src/box/alter.cc index 49f315dde7226710b5c48b4b1b5f8d56818379e1..7766800aec0688b58ca2db4072fc3c43c9f70673 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "alter.h" #include "schema.h" #include "user_def.h" diff --git a/src/box/engine.cc b/src/box/engine.cc index 398cd3c70dd8c062da3761bc8a50f12b20cfaa0a..c8539524a6cc9e511b4454c11d3ac2d2886a90bf 100644 --- a/src/box/engine.cc +++ b/src/box/engine.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "engine.h" #include "space.h" #include "exception.h" @@ -181,10 +183,10 @@ engine_checkpoint(int64_t checkpoint_id) } void -engine_join(struct recovery_state *r) +engine_join(Relay *relay) { Engine *engine; engine_foreach(engine) { - engine->join(r); + engine->join(relay); } } diff --git a/src/box/engine.h b/src/box/engine.h index 2af199922b17be5cc949f8c47f4858e0ec6c2f42..f1f0111540834bc453f143c81b3b884eff07a330 100644 --- a/src/box/engine.h +++ b/src/box/engine.h @@ -119,7 +119,7 @@ class Engine: public Object { */ virtual void dropIndex(Index*) = 0; - virtual void join(struct recovery_state*) = 0; + virtual void join(Relay*) = 0; /** * Engine specific transaction life-cycle routines. */ @@ -260,6 +260,6 @@ engine_checkpoint(int64_t checkpoint_id); * Send a snapshot. */ void -engine_join(struct recovery_state*); +engine_join(Relay*); #endif /* TARANTOOL_BOX_ENGINE_H_INCLUDED */ diff --git a/src/box/key_def.cc b/src/box/key_def.cc index 2f7fff6bee4aed9a70e5a1e1b0c49e411ece57cf..afd9d6e374e686f3ae2a7cc44b89b3e637bb0150 100644 --- a/src/box/key_def.cc +++ b/src/box/key_def.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "key_def.h" #include "space.h" #include "schema.h" diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index 8c27328020cdb89423aa144ec6bee1be5a757473..15ab23bfb89bf2f113b871073668e7ca3a3f81f4 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "box/replication.h" #include "box/lua/call.h" #include "pickle.h" diff --git a/src/box/lua/index.cc b/src/box/lua/index.cc index 0d2336a9d6115d330911ea765a11d2a8cd2dbd7b..2409fc16c8a2431485402c9497581605fd6258b7 100644 --- a/src/box/lua/index.cc +++ b/src/box/lua/index.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "box/replication.h" #include "box/lua/index.h" #include "lua/utils.h" #include "box/index.h" diff --git a/src/box/lua/space.cc b/src/box/lua/space.cc index baa15f906b2522961c60fb913831d53efb2e2c04..93e9912d6e819a8e785383abd53bfa7f7e18133a 100644 --- a/src/box/lua/space.cc +++ b/src/box/lua/space.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "box/replication.h" #include "box/lua/space.h" #include "box/lua/tuple.h" #include "lua/utils.h" diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 0c3887196cf98eed39ebe2779087ba25e6e6a1df..16e4feb8353b5e88aa42298c2ecb2fd48068b997 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "memtx_engine.h" #include "tuple.h" #include "txn.h" @@ -35,6 +37,7 @@ #include "memtx_rtree.h" #include "memtx_bitset.h" #include "space.h" +#include "request.h" #include "salad/rlist.h" #include <stdlib.h> #include <string.h> @@ -112,9 +115,9 @@ MemtxEngine::end_recovery() } void -MemtxEngine::join(struct recovery_state *r) +MemtxEngine::join(Relay *relay) { - recover_snap(r); + recover_snap(relay->r); } Handler *MemtxEngine::open() @@ -247,15 +250,6 @@ MemtxEngine::begin_recover_snapshot(int64_t /* lsn */) */ } -/** The snapshot row metadata repeats the structure of REPLACE request. */ -struct request_replace_body { - uint8_t m_body; - uint8_t k_space_id; - uint8_t m_space_id; - uint32_t v_space_id; - uint8_t k_tuple; -} __attribute__((packed)); - static void snapshot_write_row(struct recovery_state *r, struct xlog *l, struct xrow_header *row) diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h index 38c5c88ad3a762e791bf6defc7fa1158bac82c0a..1d0015796446193c01f6bc0fd156b011961b6d04 100644 --- a/src/box/memtx_engine.h +++ b/src/box/memtx_engine.h @@ -40,7 +40,7 @@ struct MemtxEngine: public Engine { virtual void begin_recover_snapshot(int64_t lsn); virtual void end_recover_snapshot(); virtual void end_recovery(); - virtual void join(struct recovery_state*); + virtual void join(Relay*); virtual int begin_checkpoint(int64_t); virtual int wait_checkpoint(); virtual void commit_checkpoint(); diff --git a/src/box/memtx_hash.cc b/src/box/memtx_hash.cc index 4afad8dac1681834a15daa96bbbe37a917eb2e4a..62f91128eaff07fb670a078ca10ed5ecb1befdf8 100644 --- a/src/box/memtx_hash.cc +++ b/src/box/memtx_hash.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "memtx_hash.h" #include "say.h" #include "tuple.h" diff --git a/src/box/memtx_rtree.cc b/src/box/memtx_rtree.cc index 6a6bc037e723004651b53ebbf08cdb45d1380891..55332f28ae3db2426795e0ac25b59a4f1344418d 100644 --- a/src/box/memtx_rtree.cc +++ b/src/box/memtx_rtree.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "memtx_rtree.h" #include "tuple.h" #include "space.h" diff --git a/src/box/memtx_tree.cc b/src/box/memtx_tree.cc index 528c156e0829462736dc0519e4c8caba116498e3..95734edece98c4290b99954d7fc4a16a9874117e 100644 --- a/src/box/memtx_tree.cc +++ b/src/box/memtx_tree.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "memtx_tree.h" #include "tuple.h" #include "space.h" diff --git a/src/box/replication.cc b/src/box/replication.cc index 667b5b1e8bcf408b6fe4f623ad434febc04ebd53..b8c01bd02d228d867a828edfba104c170c843221 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -26,13 +26,13 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" #include "replication.h" #include <say.h> #include <fiber.h> #include "recovery.h" #include "xlog.h" -#include "evio.h" #include "iproto_constants.h" #include "box/engine.h" #include "box/cluster.h" @@ -45,33 +45,24 @@ #include "cfg.h" #include "trigger.h" +Relay::Relay(int fd_arg, uint64_t sync_arg) +{ + r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), + replication_send_row, this); + coio_init(&io); + io.fd = fd_arg; + sync = sync_arg; +} + +Relay::~Relay() +{ + recovery_delete(r); +} + void replication_send_row(struct recovery_state *r, void *param, struct xrow_header *packet); -/** State of a replication relay. */ -class Relay { -public: - /** Replica connection */ - struct ev_io io; - /* Request sync */ - uint64_t sync; - struct recovery_state *r; - - Relay(int fd_arg, uint64_t sync_arg) - { - r = recovery_new(cfg_gets("snap_dir"), cfg_gets("wal_dir"), - replication_send_row, this); - coio_init(&io); - io.fd = fd_arg; - sync = sync_arg; - } - ~Relay() - { - recovery_delete(r); - } -}; - static inline void relay_set_cord_name(int fd) { @@ -93,7 +84,7 @@ replication_join_f(va_list ap) relay_set_cord_name(relay->io.fd); /* Send snapshot */ - engine_join(r); + engine_join(relay); /* Send response to JOIN command = end of stream */ struct xrow_header row; diff --git a/src/box/replication.h b/src/box/replication.h index 354553186dadbae1cbc7b41c17ce56e75bf1b504..25cc32b265032d89050b36caa9ec6c34616bb412 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -28,8 +28,21 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ + struct xrow_header; +/** State of a replication relay. */ +class Relay { +public: + /** Replica connection */ + struct ev_io io; + /* Request sync */ + uint64_t sync; + struct recovery_state *r; + Relay(int fd_arg, uint64_t sync_arg); + ~Relay(); +}; + void replication_join(int fd, struct xrow_header *packet); diff --git a/src/box/request.cc b/src/box/request.cc index dd8d7eea598db4f4e4313ddd50f574124adbebad..304d77bac5930e1df3d388b923014e18958e754a 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "request.h" #include "engine.h" #include "txn.h" diff --git a/src/box/request.h b/src/box/request.h index b63f3dd74da04c797e85f2153dd55d09c1fcaab3..745f1e2ed8c03b958b6578eb646e35b766e2f633 100644 --- a/src/box/request.h +++ b/src/box/request.h @@ -62,6 +62,15 @@ struct request int field_base; }; +/** The snapshot row metadata repeats the structure of REPLACE request. */ +struct request_replace_body { + uint8_t m_body; + uint8_t k_space_id; + uint8_t m_space_id; + uint32_t v_space_id; + uint8_t k_tuple; +} __attribute__((packed)); + void request_create(struct request *request, uint32_t code); diff --git a/src/box/schema.cc b/src/box/schema.cc index e2db3c979fa131187dfa03c35358a25751142376..338b0cd35aa52d062c9a83dfc08f36bfb39f96b3 100644 --- a/src/box/schema.cc +++ b/src/box/schema.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "schema.h" #include "user_def.h" #include "engine.h" diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc index 063601241cbf7274e1fae3a6d99365bb7c866b04..fa69a0306963e3a93f065002ade7f919cfd06679 100644 --- a/src/box/sophia_engine.cc +++ b/src/box/sophia_engine.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "sophia_engine.h" #include "cfg.h" #include "xrow.h" @@ -90,9 +92,6 @@ sophia_recovery_end(struct space *space) r->state = READY_ALL_KEYS; r->replace = sophia_replace; r->recover = space_noop; - /* - sophia_complete_recovery(space); - */ } static void @@ -157,32 +156,34 @@ SophiaEngine::end_recover_snapshot() } static inline void -sophia_send_row(struct recovery_state *r, char *tuple, +sophia_send_row(Relay *relay, uint32_t space_id, char *tuple, uint32_t tuple_size) { - struct request req; - request_create(&req, IPROTO_REPLACE); - req.space_id = 0; - req.index_id = 0; - req.tuple = tuple; - req.tuple_end = tuple + tuple_size; - + struct recovery_state *r = relay->r; + struct request_replace_body body; + body.m_body = 0x82; /* map of two elements. */ + body.k_space_id = IPROTO_SPACE_ID; + body.m_space_id = 0xce; /* uint32 */ + body.v_space_id = mp_bswap_u32(space_id); + body.k_tuple = IPROTO_TUPLE; struct xrow_header row; - row.type = IPROTO_REPLACE; - row.lsn = 0; - row.server_id = 0; - row.bodycnt = request_encode(&req, row.body); - - replication_send_row(r, /* Relay* */ NULL, &row); + row.type = IPROTO_INSERT; + row.lsn = vclock_inc(&r->vclock, r->server_id); + row.server_id = r->server_id; + row.bodycnt = 2; + row.body[0].iov_base = &body; + row.body[0].iov_len = sizeof(body); + row.body[1].iov_base = tuple; + row.body[1].iov_len = tuple_size; + replication_send_row(r, relay, &row); } void -SophiaEngine::join(struct recovery_state *r) +SophiaEngine::join(Relay *relay) { - return; - struct vclock *res = vclockset_last(&r->snap_dir.index); + struct vclock *res = vclockset_last(&relay->r->snap_dir.index); if (res == NULL) tnt_raise(ClientError, ER_MISSING_SNAPSHOT); int64_t signt = vclock_signature(res); @@ -201,6 +202,13 @@ SophiaEngine::join(struct recovery_state *r) sophia_raise(env); while (sp_get(db_cursor)) { void *db = sp_object(db_cursor); + + /* get space id */ + void *dbctl = sp_ctl(db); + void *oid = sp_get(dbctl, "id"); + uint32_t space_id = *(uint32_t*)sp_get(oid, "value", NULL); + sp_destroy(oid); + /* send database */ void *o = sp_object(db); void *cursor = sp_cursor(snapshot, o); @@ -213,7 +221,7 @@ SophiaEngine::join(struct recovery_state *r) uint32_t tuple_size = 0; char *tuple = (char *)sp_get(o, "value", &tuple_size); try { - sophia_send_row(r, tuple, tuple_size); + sophia_send_row(relay, space_id, tuple, tuple_size); } catch (...) { sp_destroy(cursor); sp_destroy(db_cursor); @@ -233,7 +241,7 @@ SophiaEngine::end_recovery() { /* create snapshot reference after tarantool * recovery, to ensure correct ref - * counting. */ + * counting */ if (m_checkpoint_lsn >= 0) { sophia_snapshot_recover(env, m_checkpoint_lsn); m_prev_checkpoint_lsn = m_checkpoint_lsn; diff --git a/src/box/sophia_engine.h b/src/box/sophia_engine.h index 7c1c8dd6e43948c795cf3a761b266237336cf6e2..535dfb2d3af0b8b140fd91417d8164ceadd5596b 100644 --- a/src/box/sophia_engine.h +++ b/src/box/sophia_engine.h @@ -43,7 +43,7 @@ struct SophiaEngine: public Engine { virtual void begin_recover_snapshot(int64_t); virtual void end_recover_snapshot(); virtual void end_recovery(); - virtual void join(struct recovery_state*); + virtual void join(Relay*); virtual int begin_checkpoint(int64_t); virtual int wait_checkpoint(); virtual void commit_checkpoint(); diff --git a/src/box/sophia_index.cc b/src/box/sophia_index.cc index d81d2b6ca21575136899f70749e3f34db79535a7..0a2340ce03e72768ea453499b83e07448ff255d1 100644 --- a/src/box/sophia_index.cc +++ b/src/box/sophia_index.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "sophia_index.h" #include "say.h" #include "tuple.h" diff --git a/src/box/space.cc b/src/box/space.cc index ae197445b42d165a3b8f233d0fc430f03e21e4fd..bf51032769e05d887b9be03fd1e53a5ceef7824c 100644 --- a/src/box/space.cc +++ b/src/box/space.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "space.h" #include <stdlib.h> #include <string.h> diff --git a/src/box/txn.cc b/src/box/txn.cc index 30c04c3aa0d5ed065fe4fcb57212dca9a77f7114..c375f71202f07181b3ba8752125eecc70064416c 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "engine.h" #include "txn.h" #include "box.h" diff --git a/src/box/user.cc b/src/box/user.cc index c3a4aab31f2b0a08052396e0b848fe88f1967f79..f80059a86ce0c8572bcd8a9adc1e162dbcf30d97 100644 --- a/src/box/user.cc +++ b/src/box/user.cc @@ -26,6 +26,8 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "evio.h" +#include "replication.h" #include "user.h" #include "user_def.h" #include "assoc.h" diff --git a/src/ffisyms.cc b/src/ffisyms.cc index e7dafaf0056eef6e83101b94257385dd6a56eb78..151242b93dc9fde9082421ad20586b7c113c4c5a 100644 --- a/src/ffisyms.cc +++ b/src/ffisyms.cc @@ -1,6 +1,9 @@ + #include <bit/bit.h> #include <lib/msgpuck/msgpuck.h> #include "scramble.h" +#include <evio.h> +#include <box/replication.h> #include <box/box.h> #include <box/tuple.h> #include <box/lua/index.h>