diff --git a/picolib/lib.rs b/picolib/lib.rs index ea17e80d1d3732e5e236fe70aacfb188d9ec44fd..1dbcba913fa7c867a62fd2b61ecc72441bb6e58f 100644 --- a/picolib/lib.rs +++ b/picolib/lib.rs @@ -1,5 +1,8 @@ use ::raft::prelude as raft; +use ::tarantool::error::TarantoolErrorCode::ProcC as ProcCError; +use ::tarantool::set_error; use ::tarantool::tlua; +use ::tarantool::tuple::{FunctionArgs, FunctionCtx, Tuple}; use indoc::indoc; use std::os::raw::c_int; @@ -126,11 +129,12 @@ fn main_run() { tarantool::eval( r#" box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true}) - box.cfg({log_level = 6}) + box.cfg({log_level = 5}) "#, ); traft::Storage::init_schema(); + init_handlers(); let raft_id: u64 = { // The id already stored in tarantool snashot @@ -181,6 +185,7 @@ fn main_run() { let raft_cfg = raft::Config { id: raft_id, + pre_vote: true, applied: traft::Storage::applied().unwrap().unwrap_or_default(), ..Default::default() }; @@ -207,19 +212,14 @@ fn raft_propose(msg: Message) { let stash = Stash::access(); let raft_ref = stash.raft_node(); let raft_node = raft_ref.as_ref().expect("Picodata not running yet"); - tlog!(Debug, "propose {:?} ................................", msg); - raft_node.propose(&msg); - tlog!(Debug, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"); + raft_node.propose(&msg) } fn raft_propose_wait_applied(msg: Message, timeout: Duration) -> bool { let stash = Stash::access(); let raft_ref = stash.raft_node(); let raft_node = raft_ref.as_ref().expect("Picodata not running yet"); - tlog!(Debug, "propose {:?} ................................", msg); - let res = raft_node.propose_wait_applied(&msg, timeout); - tlog!(Debug, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"); - res + raft_node.propose_wait_applied(&msg, timeout) } fn handle_committed_data(data: &[u8]) { @@ -234,3 +234,39 @@ fn handle_committed_data(data: &[u8]) { Err(why) => tlog!(Error, "cannot decode raft entry data: {}", why), } } + +fn init_handlers() { + crate::tarantool::eval( + r#" + box.schema.func.create('picolib.raft_interact', { + language = "C", + if_not_exists = true + }) + "#, + ); +} + +#[no_mangle] +pub extern "C" fn raft_interact(_: FunctionCtx, args: FunctionArgs) -> c_int { + let stash = Stash::access(); + let raft_ref = stash.raft_node(); + let raft_node = raft_ref + .as_ref() + .expect("picodata should already be running"); + + // Conversion pipeline: + // FunctionArgs -> Tuple -?-> traft::row::Message -?-> raft::Message; + + let m: traft::row::Message = match Tuple::from(args).into_struct() { + Ok(v) => v, + Err(e) => return set_error!(ProcCError, "{e}"), + }; + + let m = match raft::Message::try_from(m) { + Ok(v) => v, + Err(e) => return set_error!(ProcCError, "{e}"), + }; + + raft_node.step(m); + 0 +} diff --git a/picolib/traft/network.rs b/picolib/traft/network.rs index 1ba6f94ce18cb54a5930f9c344c9a9fdad23aff6..307b8609e889c9ce2b38e87b58ca8597f174e246 100644 --- a/picolib/traft/network.rs +++ b/picolib/traft/network.rs @@ -162,9 +162,9 @@ inventory::submit!(crate::InnerTest { fiber::Cond::new().wait() }), ); - // let () = l - // .eval("box.schema.func.drop('picolib.raft_interact')") - // .unwrap(); + let () = l + .eval("box.schema.func.drop('picolib.raft_interact')") + .unwrap(); // Connect to the current Tarantool instance let mut pool = ConnectionPool::with_timeout(Duration::from_millis(50)); diff --git a/picolib/traft/node.rs b/picolib/traft/node.rs index 770418ec4a572bc6c345c0ff79be3f072e039a30..7c691308730601d1f7d282bdac01b8a6170d8e03 100644 --- a/picolib/traft/node.rs +++ b/picolib/traft/node.rs @@ -9,6 +9,7 @@ use std::time::Duration; use std::time::Instant; use crate::tlog; +use crate::traft::ConnectionPool; use crate::traft::LogicalClock; use crate::traft::Storage; @@ -81,6 +82,12 @@ impl Node { fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: fn(&[u8])) { let mut next_tick = Instant::now() + Node::TICK; + let mut pool = ConnectionPool::with_timeout(Node::TICK * 4); + + // This is a temporary hack until fair joining is implemented + for peer in Storage::peers().unwrap() { + pool.connect(peer.raft_id, &peer.uri); + } let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new(); let mut lc = { @@ -132,7 +139,10 @@ fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: f let mut ready: raft::Ready = raw_node.ready(); let handle_messages = |msgs: Vec<raft::Message>| { - for _msg in msgs { + for msg in msgs { + if let Err(e) = pool.send(&msg) { + tlog!(Error, "{e}"); + } // Send messages to other peers. } }; diff --git a/picolib/traft/storage.rs b/picolib/traft/storage.rs index b58a21601b70f14d015d639e1bc1ce80523a3e2a..8654d51e674ac1366decd902396c4fe16c91a171 100644 --- a/picolib/traft/storage.rs +++ b/picolib/traft/storage.rs @@ -111,6 +111,20 @@ impl Storage { } } + pub fn peers() -> Result<Vec<row::Peer>, StorageError> { + let mut ret = Vec::new(); + + let iter = Storage::space(RAFT_GROUP)? + .select(IteratorType::All, &()) + .map_err(box_err!())?; + + for tuple in iter { + ret.push(tuple.into_struct().map_err(box_err!())?); + } + + Ok(ret) + } + pub fn id() -> Result<Option<u64>, StorageError> { Storage::raft_state("id") }