diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 95c2bcaf1c062c22ef43a72fecab7e596219c564..6600e8b4eb0aec10c3be42bc7dcda80d6985eb76 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -265,7 +265,6 @@ recovery_apply_row(struct recovery_state *r, struct xrow_header *row) { /* Check lsn */ int64_t current_lsn = vclock_get(&r->vclock, row->server_id); - assert(current_lsn >= 0); if (row->lsn > current_lsn) r->apply_row(r, r->apply_row_param, row); } diff --git a/test-run b/test-run index 7a9dffa16d0da05ec33fe0b131e92f6d86a87fd6..6b4dcd67126709a92e722a62c8a87a6ec90ceb0f 160000 --- a/test-run +++ b/test-run @@ -1 +1 @@ -Subproject commit 7a9dffa16d0da05ec33fe0b131e92f6d86a87fd6 +Subproject commit 6b4dcd67126709a92e722a62c8a87a6ec90ceb0f diff --git a/test/replication/conflict.result b/test/replication/conflict.result new file mode 100644 index 0000000000000000000000000000000000000000..7bc5dceb093750395e51f200ec3cf2a43e8546ca --- /dev/null +++ b/test/replication/conflict.result @@ -0,0 +1,63 @@ +box.schema.user.grant('guest', 'replication') +--- +... +reset master-master replication +parallel send: box.space.test:update(1, {{'#', 2, 1}}) +parallel send: box.space.test:update(1, {{'#', 2, 1}}) +replication state is correct +box.space.test:select{1} +--- +- - [1] +... +box.space.test:select{1} +--- +- - [1] +... +reset master-master replication +parallel send: box.space.test:insert{20, 1} +parallel send: box.space.test:insert{20, 2} +replication state is correct +reset master-master replication +parallel send: box.space.test:update(2, {{'=', 2, 1}}) +parallel send: box.space.test:update(2, {{'=', 2, 2}}) +replication state is correct +reset master-master replication +parallel send: box.space.test:update(1, {{'+', 2, 1}}) +parallel send: box.space.test:update(1, {{'+', 2, 2}}) +replication state is correct +box.space.test:select{1} +--- +- - [1, 4] +... +box.space.test:select{1} +--- +- - [1, 4] +... +reset master-master replication +parallel send: box.space.test:delete(999) +parallel send: box.space.test:delete(999) +replication state is correct +box.space.test:select{} +--- +- - [1, 1] + - [2, 4] + - [3, 9] + - [4, 16] + - [5, 25] + - [6, 36] + - [7, 49] + - [8, 64] + - [9, 81] +... +box.space.test:select{} +--- +- - [1, 1] + - [2, 4] + - [3, 9] + - [4, 16] + - [5, 25] + - [6, 36] + - [7, 49] + - [8, 64] + - [9, 81] +... diff --git a/test/replication/conflict.test.py b/test/replication/conflict.test.py new file mode 100644 index 0000000000000000000000000000000000000000..f482fa6e7d2e401d116eb84dd1d6ec8e3bddab5c --- /dev/null +++ b/test/replication/conflict.test.py @@ -0,0 +1,120 @@ +from lib.tarantool_server import TarantoolServer +from time import sleep +import yaml + +def check_replication(nodes, select_args=''): + for node in nodes: + node.admin('box.space.test:select{%s}' % select_args) + +master = server +master.admin("box.schema.user.grant('guest', 'replication')") + +replica = TarantoolServer(server.ini) +replica.script = 'replication/replica.lua' +replica.vardir = server.vardir +replica.rpl_master = master +replica.deploy() + +def parallel_run(cmd1, cmd2, compare): + print 'parallel send: %s' % cmd1 + print 'parallel send: %s' % cmd2 + master.admin.socket.sendall('%s\n' % cmd1) + replica.admin.socket.sendall('%s\n' % cmd2) + + master.admin.socket.recv(2048) + replica.admin.socket.recv(2048) + + # wait for status changing in tarantool + master_status = yaml.load(master.admin( + 'box.info().replication.status', silent=True + ))[0] + replica_status = yaml.load(replica.admin( + 'box.info().replication.status', silent=True + ))[0] + + # wait for status + results = [f(master_status, replica_status) for f in compare] + while True: + sleep(0.01) + if any(results): + print 'replication state is correct' + break + +def prepare_cluster(): + print 'reset master-master replication' + master.stop() + master.cleanup(True) + master.start() + master.admin("box.schema.user.grant('guest', 'replication')", silent=True) + + replica.stop() + replica.cleanup(True) + replica.start() + + master.admin("box.cfg{replication_source='%s'}" % replica.iproto.uri, silent=True) + r1_id = replica.get_param('server')['id'] + r2_id = master.get_param('server')['id'] + + master.admin("space = box.schema.space.create('test')", silent=True) + master.admin("index = space:create_index('primary', { type = 'tree'})", silent=True) + master.admin('for k = 1, 9 do space:insert{k, k*k} end', silent=True) + + # wait lsn + replica.wait_lsn(r2_id, master.get_lsn(r2_id)) + master.wait_lsn(r1_id, replica.get_lsn(r1_id)) + +# test1: double update in master and replica +prepare_cluster() +parallel_run( + "box.space.test:update(1, {{'#', 2, 1}})", + "box.space.test:update(1, {{'#', 2, 1}})", + [ + lambda x,y: x == 'stopped' or y == 'stopped', + lambda x,y: x == 'connected' and y == 'connected', + ] +) +check_replication([master, replica], '1') + +# test2: insert different values with single id +prepare_cluster() +parallel_run( + 'box.space.test:insert{20, 1}', + 'box.space.test:insert{20, 2}', + [ + lambda x,y: x == 'stopped' or y == 'stopped', + lambda x,y: x == 'connected' and y == 'connected', + ] +) + +# test3: update different values +prepare_cluster() +parallel_run( + "box.space.test:update(2, {{'=', 2, 1}})", + "box.space.test:update(2, {{'=', 2, 2}})", + [lambda x,y: x == 'connected' and y == 'connected',] +) + +# test4: CRDT increment with update +prepare_cluster() +parallel_run( + "box.space.test:update(1, {{'+', 2, 1}})", + "box.space.test:update(1, {{'+', 2, 2}})", + [lambda x,y: x == 'connected' and y == 'connected',] +) +check_replication([master, replica], '1') + +# test5: delete not existing key +prepare_cluster() +parallel_run( + "box.space.test:delete(999)", + "box.space.test:delete(999)", + [lambda x,y: x == 'connected' and y == 'connected',] +) +check_replication([master, replica]) + +# cleanup +replica.stop() +replica.cleanup(True) +server.stop() +server.cleanup(True) +server.deploy()