diff --git a/src/main.rs b/src/main.rs index fc4fd28db35beaf9d44f61f9e04d49759a2239dd..630222dea8380f92285880ec73d09440a850fce2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,6 @@ use std::time::{Duration, Instant}; use clap::StructOpt as _; use protobuf::Message as _; -use protobuf::ProtobufEnum as _; mod app; mod args; @@ -453,7 +452,7 @@ fn start_boot(args: &args::Run) { }; let ctx = traft::EntryContextConfChange { peers: vec![peer] }; let e = traft::Entry { - entry_type: raft::EntryType::EntryConfChange.value(), + entry_type: raft::EntryType::EntryConfChange, index: 1, term: 1, data: conf_change.write_to_bytes().unwrap(), diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 3968e5a2b464e5489a5e9be725a8cea493175bc3..010598b3852e9be0c84bb4966f21dd0cece6008f 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -16,7 +16,6 @@ use std::convert::TryFrom; use uuid::Uuid; use protobuf::Message as _; -use protobuf::ProtobufEnum as _; pub use network::ConnectionPool; pub use storage::Storage; @@ -142,6 +141,9 @@ impl Peer {} /// #[derive(Clone, Serialize, Deserialize, PartialEq)] pub struct Entry { + /// See correspondig definition in `raft-rs`: + /// - <https://github.com/tikv/raft-rs/blob/v0.6.0/proto/proto/eraftpb.proto#L7> + /// /// ``` /// enum EntryType { /// EntryNormal = 0; @@ -149,7 +151,8 @@ pub struct Entry { /// EntryConfChangeV2 = 2; /// } /// ``` - pub entry_type: i32, + #[serde(with = "entry_type_as_i32")] + pub entry_type: raft::EntryType, pub index: u64, pub term: u64, @@ -161,13 +164,34 @@ pub struct Entry { pub context: Option<EntryContext>, } +mod entry_type_as_i32 { + use super::error::CoercionError::UnknownEntryType; + use ::raft::prelude as raft; + use protobuf::ProtobufEnum as _; + use serde::{self, Deserialize, Deserializer, Serializer}; + + use serde::de::Error as _; + + pub fn serialize<S>(t: &raft::EntryType, ser: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + ser.serialize_i32(t.value()) + } + + pub fn deserialize<'de, D>(de: D) -> Result<raft::EntryType, D::Error> + where + D: Deserializer<'de>, + { + let t = i32::deserialize(de)?; + raft::EntryType::from_i32(t).ok_or_else(|| D::Error::custom(UnknownEntryType(t))) + } +} + impl std::fmt::Debug for Entry { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("Entry") - .field( - "entry_type", - &raft::EntryType::from_i32(self.entry_type).ok_or(self.entry_type), - ) + .field("entry_type", &self.entry_type) .field("index", &self.index) .field("term", &self.term) .field("data", &self.data) @@ -258,7 +282,7 @@ impl TryFrom<&raft::Entry> for self::Entry { fn try_from(e: &raft::Entry) -> Result<Self, Self::Error> { let ret = Self { - entry_type: e.entry_type.value(), + entry_type: e.entry_type, index: e.index, term: e.term, data: Vec::from(e.get_data()), @@ -274,21 +298,16 @@ impl TryFrom<&raft::Entry> for self::Entry { } } -impl TryFrom<self::Entry> for raft::Entry { - type Error = error::CoercionError; - - fn try_from(row: self::Entry) -> Result<raft::Entry, Self::Error> { - let ret = raft::Entry { - entry_type: raft::EntryType::from_i32(row.entry_type) - .ok_or(Self::Error::UnknownEntryType(row.entry_type))?, +impl From<self::Entry> for raft::Entry { + fn from(row: self::Entry) -> raft::Entry { + raft::Entry { + entry_type: row.entry_type, index: row.index, term: row.term, data: row.data.into(), context: EntryContext::write_to_bytes(row.context.as_ref()).into(), ..Default::default() - }; - - Ok(ret) + } } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 28aa14462b0974d1168514ae0543813b6d6e6d65..77bf6df01d452618e51f2ff9afdc8017168a040c 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -332,7 +332,7 @@ fn handle_committed_entries( } }; - if entry.entry_type == raft::EntryType::EntryNormal as i32 { + if entry.entry_type == raft::EntryType::EntryNormal { handle_committed_normal_entry(entry, notifications, joint_state_latch) } else { handle_committed_conf_change(entry, raw_node, pool, joint_state_latch, config_changed) @@ -355,6 +355,7 @@ fn handle_committed_normal_entry( notifications: &mut HashMap<LogicalClock, Notify>, joint_state_latch: &mut Option<JointStateLatch>, ) { + assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit(); if let Some(lc) = entry.lc() { @@ -398,13 +399,13 @@ fn handle_committed_conf_change( // and in joint state transitions. let conf_state; - if entry.entry_type == raft::EntryType::EntryConfChange as i32 { + if entry.entry_type == raft::EntryType::EntryConfChange { let mut cc = raft::ConfChange::default(); cc.merge_from_bytes(&entry.data).unwrap(); *config_changed = true; conf_state = raw_node.apply_conf_change(&cc).unwrap(); - } else { + } else if entry.entry_type == raft::EntryType::EntryConfChangeV2 { let mut cc = raft::ConfChangeV2::default(); cc.merge_from_bytes(&entry.data).unwrap(); @@ -421,6 +422,8 @@ fn handle_committed_conf_change( // moment raft-rs will implicitly propose another empty // conf change that represents leaving the joint state. conf_state = raw_node.apply_conf_change(&cc).unwrap() + } else { + unreachable!(); }; Storage::persist_conf_state(&conf_state).unwrap(); diff --git a/src/traft/storage.rs b/src/traft/storage.rs index d9de8c9fb72c153cfe2121685cea857824a84a5b..66ee4957b33c2d749f529c3b2fe29f0941e487ed 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -306,8 +306,7 @@ impl Storage { if row.index >= high { break; } - let entry = raft::Entry::try_from(row)?; - ret.push(entry); + ret.push(row.into()); } Ok(ret) @@ -467,7 +466,7 @@ inventory::submit!(crate::InnerTest { raft_log.put(&(1337, 99, 1, "", ())).unwrap(); assert_err!( Storage.entries(1, 100, u64::MAX), - "unknown error unknown entry type (1337)" + "unknown error Failed to decode tuple: unknown entry type (1337)" ); raft_log.put(&(0, 99, 1, "", false)).unwrap();