diff --git a/picolib/lib.rs b/picolib/lib.rs index 792e49585db01dc7b98af3d1d76b17029c104f41..243645d52f1155b691d0e3c0b37f9650a4f28daf 100644 --- a/picolib/lib.rs +++ b/picolib/lib.rs @@ -19,6 +19,7 @@ use message::Message; use std::cell::Ref; use std::cell::RefCell; use std::convert::TryFrom; +use std::time::Duration; #[derive(Default)] pub struct Stash { @@ -75,7 +76,12 @@ pub unsafe extern "C" fn luaopen_picolib(l: *mut std::ffi::c_void) -> c_int { ); luamod.set( "raft_propose_eval", - tlua::function1(|x: String| raft_propose(Message::EvalLua { code: x })), + tlua::function2(|timeout: f64, x: String| { + raft_propose_wait_applied( + Message::EvalLua { code: x }, + Duration::from_secs_f64(timeout), + ) + }), ); { l.exec( @@ -190,6 +196,16 @@ fn raft_propose(msg: Message) { tlog!(Debug, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"); } +fn raft_propose_wait_applied(msg: Message, timeout: Duration) -> bool { + let stash = Stash::access(); + let raft_ref = stash.raft_node(); + let raft_node = raft_ref.as_ref().expect("Picodata not running yet"); + tlog!(Debug, "propose {:?} ................................", msg); + let res = raft_node.propose_wait_applied(&msg, timeout); + tlog!(Debug, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"); + res +} + fn handle_committed_data(data: &[u8]) { use Message::*; diff --git a/picolib/traft.rs b/picolib/traft.rs index c7a900c1ba0e8503fa7b041458c151674afdc639..7dabd3699efe35ef0071f26c4f476f469c22eff8 100644 --- a/picolib/traft.rs +++ b/picolib/traft.rs @@ -2,6 +2,8 @@ mod node; mod storage; pub use node::Node; +use serde::{Deserialize, Serialize}; +use std::convert::TryFrom; pub use storage::Storage; pub mod row { mod entry; @@ -10,3 +12,34 @@ pub mod row { pub use entry::Entry; pub use message::Message; } + +#[derive(Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)] +pub struct LogicalClock { + id: u64, + gen: u64, + count: u64, +} + +impl LogicalClock { + pub fn new(id: u64, gen: u64) -> Self { + Self { id, gen, count: 0 } + } + + pub fn inc(&mut self) { + self.count += 1; + } +} + +impl TryFrom<&[u8]> for LogicalClock { + type Error = rmp_serde::decode::Error; + + fn try_from(data: &[u8]) -> Result<Self, Self::Error> { + rmp_serde::from_read_ref(data) + } +} + +impl From<&LogicalClock> for Vec<u8> { + fn from(lc: &LogicalClock) -> Vec<u8> { + rmp_serde::to_vec(lc).unwrap() + } +} diff --git a/picolib/traft/node.rs b/picolib/traft/node.rs index ddfdfcfc50b9f38ade269c53ed8f9d67029eebb4..770418ec4a572bc6c345c0ff79be3f072e039a30 100644 --- a/picolib/traft/node.rs +++ b/picolib/traft/node.rs @@ -2,14 +2,18 @@ use ::raft::prelude as raft; use ::raft::Error as RaftError; use ::tarantool::fiber; use ::tarantool::util::IntoClones; +use std::collections::HashMap; +use std::convert::TryFrom; use std::time::Duration; use std::time::Instant; use crate::tlog; +use crate::traft::LogicalClock; use crate::traft::Storage; type RawNode = raft::RawNode<Storage>; +type Notify = fiber::Channel<()>; pub struct Node { _main_loop: fiber::LuaUnitJoinHandle, @@ -18,7 +22,8 @@ pub struct Node { #[derive(Clone, Debug)] enum Request { - Propose(Vec<u8>), + Propose { data: Vec<u8> }, + ProposeWaitApplied { data: Vec<u8>, notify: Notify }, Step(raft::Message), } @@ -36,11 +41,38 @@ impl Node { }) } - pub fn propose<T: Into<Vec<u8>>>(&self, data: T) { - let req = Request::Propose(data.into()); + pub fn propose(&self, data: impl Into<Vec<u8>>) { + let req = Request::Propose { data: data.into() }; self.inbox.send(req).unwrap(); } + pub fn propose_wait_applied(&self, data: impl Into<Vec<u8>>, timeout: Duration) -> bool { + let (rx, tx) = fiber::Channel::new(1).into_clones(); + let now = Instant::now(); + + let req = Request::ProposeWaitApplied { + data: data.into(), + notify: tx, + }; + + match self.inbox.send_timeout(req, timeout) { + Err(fiber::SendError::Disconnected(_)) => unreachable!(), + Err(fiber::SendError::Timeout(_)) => { + rx.close(); + return false; + } + Ok(()) => (), + } + + match rx.recv_timeout(timeout.saturating_sub(now.elapsed())) { + Err(_) => { + rx.close(); + false + } + Ok(()) => true, + } + } + pub fn step(&self, msg: raft::Message) { let req = Request::Step(msg); self.inbox.send(req).unwrap(); @@ -50,13 +82,37 @@ impl Node { fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: fn(&[u8])) { let mut next_tick = Instant::now() + Node::TICK; + let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new(); + let mut lc = { + let id = Storage::id().unwrap().unwrap(); + let gen = Storage::gen().unwrap().unwrap_or(0) + 1; + Storage::persist_gen(gen).unwrap(); + LogicalClock::new(id, gen) + }; + loop { + // Clean up obsolete notifications + notifications.retain(|_, notify: &mut Notify| !notify.is_closed()); + match inbox.recv_timeout(Node::TICK) { - Ok(Request::Propose(data)) => { - raw_node.propose(vec![], data).unwrap(); + Ok(Request::Propose { data }) => { + if let Err(e) = raw_node.propose(vec![], data) { + tlog!(Error, "{e}"); + } + } + Ok(Request::ProposeWaitApplied { data, notify }) => { + lc.inc(); + if let Err(e) = raw_node.propose(Vec::from(&lc), data) { + tlog!(Error, "{e}"); + notify.close(); + } else { + notifications.insert(lc.clone(), notify); + } } Ok(Request::Step(msg)) => { - raw_node.step(msg).unwrap(); + if let Err(e) = raw_node.step(msg) { + tlog!(Error, "{e}"); + } } Err(fiber::RecvError::Timeout) => (), Err(fiber::RecvError::Disconnected) => unreachable!(), @@ -91,12 +147,17 @@ fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: f unimplemented!(); } - let handle_committed_entries = |committed_entries: Vec<raft::Entry>| { + let mut handle_committed_entries = |committed_entries: Vec<raft::Entry>| { for entry in committed_entries { Storage::persist_applied(entry.index).unwrap(); if entry.get_entry_type() == raft::EntryType::EntryNormal { - on_commit(entry.get_data()) + on_commit(entry.get_data()); + if let Ok(lc) = LogicalClock::try_from(entry.get_context()) { + if let Some(notify) = notifications.remove(&lc) { + notify.try_send(()).ok(); + } + } } // TODO: handle EntryConfChange diff --git a/picolib/traft/row/entry.rs b/picolib/traft/row/entry.rs index 0d1420fe7c4b8adabb73f6a362204b67b1c0c56a..c9507080c759320532547699cac40a4712a004ea 100644 --- a/picolib/traft/row/entry.rs +++ b/picolib/traft/row/entry.rs @@ -4,6 +4,7 @@ use serde::Serialize; use std::convert::TryFrom; use crate::error::CoercionError; +use crate::traft::LogicalClock; use crate::Message; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -12,6 +13,8 @@ pub struct Entry { pub index: u64, pub term: u64, pub msg: Message, + #[serde(default)] + pub ctx: Option<LogicalClock>, } impl ::tarantool::tuple::AsTuple for Entry {} @@ -24,6 +27,7 @@ impl TryFrom<raft::Entry> for self::Entry { index: e.get_index(), term: e.get_term(), msg: Message::try_from(e.get_data())?, + ctx: LogicalClock::try_from(e.get_context()).ok(), }) } } @@ -47,6 +51,11 @@ impl TryFrom<self::Entry> for raft::Entry { let bytes: Vec<u8> = Vec::from(&row.msg); ret.set_data(bytes.into()); + if let Some(ctx) = row.ctx { + let ctx: Vec<u8> = Vec::from(&ctx); + ret.set_context(ctx.into()); + } + Ok(ret) } } @@ -85,12 +94,35 @@ inventory::submit!(crate::InnerTest { assert_eq!( ser(Entry::default()), - json!(["EntryNormal", 0u64, 0u64, ["empty"]]) + json!(["EntryNormal", 0u64, 0u64, ["empty"], null]) ); assert_eq!( ser(Entry::new(Message::Info { msg: "!".into() })), - json!(["EntryNormal", 0u64, 0u64, ["info", "!"]]) + json!(["EntryNormal", 0u64, 0u64, ["info", "!"], null]) + ); + + assert_eq!( + ser(Entry { + entry_type: "EntryNormal".into(), + index: 1001, + term: 1002, + msg: Message::EvalLua { + code: "return nil".into(), + }, + ctx: Some(LogicalClock { + id: 1, + gen: 2, + count: 101 + }), + }), + json!([ + "EntryNormal", + 1001u64, + 1002u64, + ["eval_lua", "return nil"], + [1, 2, 101], + ]) ); assert_eq!( @@ -101,8 +133,15 @@ inventory::submit!(crate::InnerTest { msg: Message::EvalLua { code: "return nil".into(), }, + ctx: None, }), - json!(["EntryNormal", 1001u64, 1002u64, ["eval_lua", "return nil"]]) + json!([ + "EntryNormal", + 1001u64, + 1002u64, + ["eval_lua", "return nil"], + null, + ]) ); let msg = Message::Info { msg: "?".into() }; @@ -112,6 +151,7 @@ inventory::submit!(crate::InnerTest { index: 99, term: 2, msg: msg.clone(), + ctx: None, }) .expect("coercing raft::Entry from self::Entry failed"), raft::Entry { diff --git a/picolib/traft/storage.rs b/picolib/traft/storage.rs index d9cc4d77f92fc99f8b7a89e2d94935f332091b88..d83fb181bcf473886c553371308a8de5dbc8a874 100644 --- a/picolib/traft/storage.rs +++ b/picolib/traft/storage.rs @@ -42,6 +42,7 @@ impl Storage { {name = 'index', type = 'unsigned', is_nullable = false}, {name = 'term', type = 'unsigned', is_nullable = false}, {name = 'msg', type = 'any', is_nullable = true}, + {name = 'ctx', type = 'any', is_nullable = true}, } }) box.space.raft_log:create_index('pk', { @@ -112,6 +113,11 @@ impl Storage { Storage::raft_state("id") } + /// Node generation i.e. the number of restarts. + pub fn gen() -> Result<Option<u64>, StorageError> { + Storage::raft_state("gen") + } + pub fn term() -> Result<Option<u64>, StorageError> { Storage::raft_state("term") } @@ -144,6 +150,10 @@ impl Storage { Storage::persist_raft_state("vote", vote) } + pub fn persist_gen(gen: u64) -> Result<(), StorageError> { + Storage::persist_raft_state("gen", gen) + } + pub fn persist_id(id: u64) -> Result<(), StorageError> { Storage::space(RAFT_STATE)? // We use `insert` instead of `replace` here