diff --git a/src/main.rs b/src/main.rs index 2900ba956566762572e5d9285503ecfa5f48543c..5e8f2990e69701926bc3e71a07194c62fb92dfef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -776,7 +776,8 @@ fn start_boot(args: &args::Run) { args.replicaset_id.clone(), args.advertise_address(), args.failure_domain(), - ); + ) + .expect("failed adding initial peer"); let raft_id = peer.raft_id; let instance_id = peer.instance_id.clone(); diff --git a/src/storage.rs b/src/storage.rs index 92f8c0c1228e629b7a32483d2cbd50c94b5d39ff..c688a67229fd9b7227e3b189c3248f829f388431 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -7,7 +7,6 @@ use crate::traft::error::Error; use crate::traft::rpc::sharding::cfg::ReplicasetWeights; use crate::traft::Migration; use crate::traft::RaftId; -use crate::traft::RaftIndex; use crate::traft::Replicaset; use crate::traft::Result; @@ -398,7 +397,6 @@ impl Peers { .index_builder(Self::INDEX_REPLICASET_ID) .unique(false) .part(peer_field::ReplicasetId) - .part(peer_field::CommitIndex) .if_not_exists(true) .create()?; @@ -565,7 +563,6 @@ define_peer_fields! { RaftId : traft::RaftId = ("raft_id", FieldType::Unsigned) ReplicasetId : String = ("replicaset_id", FieldType::String) ReplicasetUuid : String = ("replicaset_uuid", FieldType::String) - CommitIndex : RaftIndex = ("commit_index", FieldType::Unsigned) CurrentGrade : traft::CurrentGrade = ("current_grade", FieldType::Array) TargetGrade : traft::TargetGrade = ("target_grade", FieldType::Array) FailureDomain : traft::FailureDomain = ("failure_domain", FieldType::Map) @@ -812,13 +809,13 @@ inventory::submit!(crate::InnerTest { for peer in vec![ // r1 - ("i1", "i1-uuid", 1u64, "r1", "r1-uuid", 1u64, (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), - ("i2", "i2-uuid", 2u64, "r1", "r1-uuid", 2, (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), + ("i1", "i1-uuid", 1u64, "r1", "r1-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), + ("i2", "i2-uuid", 2u64, "r1", "r1-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), // r2 - ("i3", "i3-uuid", 3u64, "r2", "r2-uuid", 10, (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), - ("i4", "i4-uuid", 4u64, "r2", "r2-uuid", 10, (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), + ("i3", "i3-uuid", 3u64, "r2", "r2-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), + ("i4", "i4-uuid", 4u64, "r2", "r2-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), // r3 - ("i5", "i5-uuid", 5u64, "r3", "r3-uuid", 10, (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), + ("i5", "i5-uuid", 5u64, "r3", "r3-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), ] { raft_group.put(&peer).unwrap(); let (_, _, raft_id, ..) = peer; @@ -844,9 +841,9 @@ inventory::submit!(crate::InnerTest { " in unique index \"raft_id\"", " in space \"raft_group\"", " with old tuple", - r#" - ["i1", "i1-uuid", 1, "r1", "r1-uuid", 1, ["{gon}", 0], ["{tgon}", 0], {{"A": "B"}}]"#, + r#" - ["i1", "i1-uuid", 1, "r1", "r1-uuid", ["{gon}", 0], ["{tgon}", 0], {{"A": "B"}}]"#, " and new tuple", - r#" - ["i99", "", 1, "", "", 0, ["{goff}", 0], ["{tgoff}", 0], {{}}]"#, + r#" - ["i99", "", 1, "", "", ["{goff}", 0], ["{tgoff}", 0], {{}}]"#, ), gon = CurrentGrade::Online, goff = CurrentGrade::Offline, diff --git a/src/traft/mod.rs b/src/traft/mod.rs index f701a2b3efa40ff30adf97adacd6764fee0bacfe..ccbf6b579e42393f604a183af990bb0c43c62b27 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -416,10 +416,6 @@ pub struct Peer { pub replicaset_id: ReplicasetId, pub replicaset_uuid: String, - /// Index of the most recent raft log entry that persisted this peer. - /// `0` means it's not committed yet. - pub commit_index: RaftIndex, - /// The cluster's mind about actual state of this instance's activity. pub current_grade: CurrentGrade, /// The desired state of this instance @@ -471,7 +467,6 @@ impl Peer { raft_id: Default::default(), replicaset_id: Default::default(), replicaset_uuid: Default::default(), - commit_index: Default::default(), current_grade: Default::default(), target_grade: Default::default(), failure_domain: Default::default(), @@ -483,12 +478,11 @@ impl std::fmt::Display for Peer { #[rustfmt::skip] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { return write!(f, - "({}, {}, {}, {}, {}, {})", + "({}, {}, {}, {}, {})", self.instance_id, self.raft_id, self.replicaset_id, GradeTransition { from: self.current_grade, to: self.target_grade }, - self.commit_index, &self.failure_domain, ); diff --git a/src/traft/node.rs b/src/traft/node.rs index a1eb3b8633ac6e49083643c794602e093960e630..7c1842e6043d4c928b9052e430399e1bec9d5c1a 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -468,7 +468,7 @@ impl NodeImpl { let topology = self.topology_mut()?; // FIXME: remove this once we introduce some 'async' stuff let notify_for_address; - let mut peer = match req { + let peer = match req { TopologyRequest::Join(JoinRequest { instance_id, replicaset_id, @@ -504,8 +504,6 @@ impl NodeImpl { .map_err(RaftError::ConfChangeError)? } }; - peer.commit_index = self.raw_node.raft.raft_log.last_index() + 1; - let (lc, notify) = self.schedule_notification(); let ctx = traft::EntryContextNormal::new(lc, Op::persist_peer(peer)); diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 7737b0f6bf214fa82d0405ad7608cf2dab8ee7e9..0f1002938923829f85d5919fbcc892e7b3cebf8b 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -10,8 +10,6 @@ use crate::traft::{CurrentGrade, CurrentGradeVariant, Grade, TargetGrade, Target use crate::traft::{InstanceId, RaftId, ReplicasetId}; use crate::util::Uppercase; -use raft::INVALID_INDEX; - pub struct Topology { replication_factor: u8, max_raft_id: RaftId, @@ -153,7 +151,6 @@ impl Topology { raft_id, replicaset_id, replicaset_uuid, - commit_index: INVALID_INDEX, current_grade: CurrentGrade::offline(0), target_grade: TargetGrade::offline(0), failure_domain, @@ -215,19 +212,15 @@ impl Topology { } } -// Create first peer in the cluster +/// Create first peer in the cluster pub fn initial_peer( instance_id: Option<InstanceId>, replicaset_id: Option<ReplicasetId>, advertise: Address, failure_domain: FailureDomain, -) -> (Peer, Address) { +) -> Result<(Peer, Address), String> { let mut topology = Topology::from_peers(vec![]); - let (mut peer, advertise) = topology - .join(instance_id, replicaset_id, advertise, failure_domain) - .unwrap(); - peer.commit_index = 1; - (peer, advertise) + topology.join(instance_id, replicaset_id, advertise, failure_domain) } #[rustfmt::skip] @@ -297,7 +290,6 @@ mod tests { replicaset_id: $replicaset_id.into(), instance_uuid: instance_uuid($instance_id), replicaset_uuid: replicaset_uuid($replicaset_id), - commit_index: raft::INVALID_INDEX, current_grade: $current_grade.into_grade(), target_grade: $target_grade.into_grade(), diff --git a/test/int/test_basics.py b/test/int/test_basics.py index 9b365f8a1a5cd6224c65a507a02fab0d28141109..2485976a4919f606acceafe2fe1abb3e63a939a0 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -218,18 +218,18 @@ def test_raft_log(instance: Instance): |index|term| lc |contents| +-----+----+-----+--------+ | 1 | 1 |1.0.1|Insert(pico_peer_addresses, [1,"127.0.0.1:{p}"])| -| 2 | 1 |1.0.2|PersistPeer(i1, 1, r1, Offline(0), 1, {b})| +| 2 | 1 |1.0.2|PersistPeer(i1, 1, r1, Offline(0), {b})| | 3 | 1 |1.0.3|Insert(cluster_state, ["replication_factor",1])| | 4 | 1 |1.0.4|Insert(cluster_state, ["desired_schema_version",0])| | 5 | 1 | |AddNode(1)| | 6 | 2 | |-| -| 7 | 2 |1.1.1|PersistPeer(i1, 1, r1, Offline(0) -> Online(1), 7, {b})| -| 8 | 2 |1.1.2|PersistPeer(i1, 1, r1, RaftSynced(1) -> Online(1), 8, {b})| -| 9 | 2 |1.1.3|PersistPeer(i1, 1, r1, Replicated(1) -> Online(1), 9, {b})| +| 7 | 2 |1.1.1|PersistPeer(i1, 1, r1, Offline(0) -> Online(1), {b})| +| 8 | 2 |1.1.2|PersistPeer(i1, 1, r1, RaftSynced(1) -> Online(1), {b})| +| 9 | 2 |1.1.3|PersistPeer(i1, 1, r1, Replicated(1) -> Online(1), {b})| | 10 | 2 |1.1.4|Insert(replicasets, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",1.0,0])| -| 11 | 2 |1.1.5|PersistPeer(i1, 1, r1, ShardingInitialized(1) -> Online(1), 11, {b})| +| 11 | 2 |1.1.5|PersistPeer(i1, 1, r1, ShardingInitialized(1) -> Online(1), {b})| | 12 | 2 |1.1.6|Replace(cluster_state, ["vshard_bootstrapped",true])| -| 13 | 2 |1.1.7|PersistPeer(i1, 1, r1, Online(1), 13, {b})| +| 13 | 2 |1.1.7|PersistPeer(i1, 1, r1, Online(1), {b})| +-----+----+-----+--------+ """.format( # noqa: E501 p=instance.port, b="{}"