Skip to content
Snippets Groups Projects
Commit b23c104a authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

test: add a sync replication test

parent be7183bd
No related branches found
No related tags found
1 merge request!332fix: don't use tarantool synchronous replication, it's broken
......@@ -10,7 +10,7 @@ use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::TransactionError;
use ::tarantool::error::{TarantoolError, TransactionError};
use ::tarantool::fiber;
use ::tarantool::fiber::{Cond, Mutex};
use ::tarantool::proc;
......@@ -746,7 +746,7 @@ impl NodeImpl {
self.handle_read_states(ready.read_states());
start_transaction(|| -> Result<(), TransactionError> {
if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
// Apply committed entries.
self.handle_committed_entries(ready.committed_entries(), topology_changed, expelled);
......@@ -763,8 +763,11 @@ impl NodeImpl {
}
Ok(())
})
.unwrap();
}) {
tlog!(Error, "transaction failed: {e}, {}", TarantoolError::last());
tlog!(Warning, "raft ready dropped: {ready:#?}");
panic!("no good");
}
// This bunch of messages is special. It must be sent only
// AFTER the HardState, Entries and Snapshot are persisted
......@@ -777,7 +780,7 @@ impl NodeImpl {
// Send out messages to the other nodes.
self.handle_messages(light_rd.take_messages());
start_transaction(|| -> Result<(), TransactionError> {
if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
self.storage.raft.persist_commit(commit).unwrap();
......@@ -787,8 +790,11 @@ impl NodeImpl {
self.handle_committed_entries(light_rd.committed_entries(), topology_changed, expelled);
Ok(())
})
.unwrap();
}) {
tlog!(Error, "transaction failed: {e}, {}", TarantoolError::last());
tlog!(Warning, "raft light ready dropped: {light_rd:#?}");
panic!("no good");
}
// Advance the apply index.
self.raw_node.advance_apply();
......
import funcy # type: ignore
import pytest
from conftest import (
Cluster,
Instance,
)
@pytest.fixture
def cluster3(cluster: Cluster):
cluster.deploy(instance_count=3, init_replication_factor=3)
return cluster
@funcy.retry(tries=30, timeout=0.2)
def wait_repl_leader(i: Instance, other_than=None):
repl_leader = i.eval("return box.info.election.leader")
assert repl_leader
if other_than:
assert repl_leader != other_than
return repl_leader
@funcy.retry(tries=60, timeout=0.2)
def wait_vclock(i: Instance, vclock_expected: dict[int, int]):
vclock_actual = i.eval("return box.info.vclock")
del vclock_actual[0]
for k, v_exp in vclock_expected.items():
assert (k, v_exp) <= (k, vclock_actual[k])
# fmt: off
def test_2_of_3_writable(cluster3: Cluster):
i1, i2, i3 = cluster3.instances
rl = wait_repl_leader(i1)
assert rl == wait_repl_leader(i2)
assert rl == wait_repl_leader(i3)
leader, i2, i3 = sorted(
[i1, i2, i3],
key=lambda i: rl == i.eval("return box.info.id"),
reverse=True
)
rl_vclock = leader.eval("return box.info.vclock")
del rl_vclock[0]
wait_vclock(i2, rl_vclock) # sometimes fails with i2 missing one transaction
wait_vclock(i3, rl_vclock) # sometimes fails with i3 missing one transaction
rl_vclock = leader.eval("""
box.schema.space.create('test_space', {is_sync = true})
:create_index('pk')
box.space.test_space:replace {1}
return box.info.vclock
""")
del rl_vclock[0]
wait_vclock(i2, rl_vclock)
assert [[1]] == i2.eval("return box.space.test_space:select()")
wait_vclock(i3, rl_vclock)
assert [[1]] == i3.eval("return box.space.test_space:select()")
leader.terminate()
rl = wait_repl_leader(i2, other_than=rl)
assert rl == wait_repl_leader(i3)
old_leader = leader
leader, i3 = sorted(
[i2, i3],
key=lambda i: rl == i.eval("return box.info.id"),
reverse=True
)
rl_vclock = leader.eval("return box.info.vclock")
del rl_vclock[0]
wait_vclock(i3, rl_vclock)
rl_vclock = leader.eval("""
box.space.test_space:replace {2}
return box.info.vclock
""")
del rl_vclock[0]
wait_vclock(i3, rl_vclock)
assert [[1], [2]] == i3.eval("return box.space.test_space:select()")
print(i3.call("picolib.raft_log", dict(return_string=True)))
print(f"{old_leader=}")
old_leader.start()
old_leader.wait_online()
assert wait_repl_leader(old_leader) == rl
wait_vclock(old_leader, rl_vclock)
assert [[1], [2]] == old_leader.eval("return box.space.test_space:select()")
# fmt: on
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment