diff --git a/picolib/lib.rs b/picolib/lib.rs index 98137abaa97b458ae47d2dfa07573b98f402b344..090a51684a9017ff258d79aaebc138cfbdc32f54 100644 --- a/picolib/lib.rs +++ b/picolib/lib.rs @@ -1,8 +1,8 @@ -use slog::{debug, info, o, error}; -use std::os::raw::c_int; use ::tarantool::tlua; use rmp_serde; use serde::{Deserialize, Serialize}; +use slog::{debug, error, info, o}; +use std::os::raw::c_int; mod tarantool; mod traft; @@ -13,9 +13,9 @@ pub struct InnerTest { } inventory::collect!(InnerTest); +use std::cell::Ref; use std::cell::RefCell; use std::cell::RefMut; -use std::cell::Ref; use std::convert::{TryFrom, TryInto}; use std::rc::Rc; @@ -46,9 +46,9 @@ pub extern "C" fn luaopen_picolib(l: *mut std::ffi::c_void) -> c_int { match command.as_deref() { Ok("run") => { main_run(&stash); - }, - Ok(_) => {}, - Err(_) => {}, + } + Ok(_) => {} + Err(_) => {} } unsafe { @@ -76,29 +76,29 @@ pub extern "C" fn luaopen_picolib(l: *mut std::ffi::c_void) -> c_int { let stash = stash.clone(); luamod.set( "raft_test_propose", - tlua::function1(move |x: String| - raft_propose(&stash, RaftEntryData::Info(format!("{}", x)))), + tlua::function1(move |x: String| { + raft_propose(&stash, RaftEntryData::Info(format!("{}", x))) + }), ); } { let stash = stash.clone(); luamod.set( "broadcast_lua_eval", - tlua::function1( - move |x: String| { - raft_propose(&stash, RaftEntryData::EvalLua(x)) - } - ) + tlua::function1(move |x: String| raft_propose(&stash, RaftEntryData::EvalLua(x))), ) } { - l.exec(r#" + l.exec( + r#" function inspect() return {raft_log = box.space.raft_log:fselect()}, {raft_state = box.space.raft_state:fselect()} end - "#).unwrap(); + "#, + ) + .unwrap(); } use tlua::AsLua; @@ -204,9 +204,7 @@ fn handle_committed_data(logger: &slog::Logger, data: &[u8]) { Ok(x) => match x { EvalLua(code) => crate::tarantool::eval(&code), Info(msg) => info!(logger, "{}", msg), - } + }, Err(why) => error!(logger, "cannot decode raft entry data: {}", why), } } - - diff --git a/picolib/traft/mod.rs b/picolib/traft/mod.rs index cc0afbf74c1e9a2eaec9422be734cd6bcc6c44f8..cc1c7ab08b5764937c5fe427d3d95ed1c37bc249 100644 --- a/picolib/traft/mod.rs +++ b/picolib/traft/mod.rs @@ -4,8 +4,8 @@ mod storage; pub use node::Node; pub use storage::Storage; -pub use raft::Ready; -pub use raft::Config; -pub use raft::eraftpb::Message; pub use raft::eraftpb::Entry; +pub use raft::eraftpb::Message; +pub use raft::Config; +pub use raft::Ready; // pub use raft::prelude::*; diff --git a/picolib/traft/node.rs b/picolib/traft/node.rs index 812a433d892f30733a2b3d9abe27d5eb2425eeae..3bbc2ca98266462f8b0b3b70201a1a130fc0e234 100644 --- a/picolib/traft/node.rs +++ b/picolib/traft/node.rs @@ -1,16 +1,13 @@ -use slog::{ - info, -}; -use std::ops::{Deref, DerefMut}; use raft::prelude::*; use raft::Error as RaftError; +use slog::info; +use std::ops::{Deref, DerefMut}; use std::cell::RefCell; use std::rc::Rc; use std::time::Duration; - use super::storage::Storage; use crate::tarantool::SlogDrain; use ::tarantool::fiber; @@ -25,11 +22,15 @@ pub struct Node { } impl Node { - pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { + pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { let logger = slog::Logger::root(SlogDrain, slog::o!()); let raw_node = RawNode::new(cfg, Storage, &logger)?; let raw_node = Rc::from(RefCell::from(raw_node)); - let ret = Node {logger, raw_node, main_loop: None}; + let ret = Node { + logger, + raw_node, + main_loop: None, + }; Ok(ret) } diff --git a/picolib/traft/storage.rs b/picolib/traft/storage.rs index 45963e9bb2d7df402436f6970b963ca5ac2e202a..cf2fbab5b437d22b5c0d2ccbb1084456760bee78 100644 --- a/picolib/traft/storage.rs +++ b/picolib/traft/storage.rs @@ -1,10 +1,10 @@ -use slog::{debug, o}; +use ::tarantool::index::IteratorType; +use ::tarantool::space::Space; +use ::tarantool::tuple::Tuple; use raft::eraftpb::ConfState; use raft::StorageError; use serde::{Deserialize, Serialize}; -use ::tarantool::space::Space; -use ::tarantool::tuple::Tuple; -use ::tarantool::index::IteratorType; +use slog::{debug, o}; use raft::prelude::*; use raft::Error as RaftError; @@ -104,10 +104,13 @@ impl Storage { space.replace(&("applied", applied)).unwrap(); } - pub fn entries(low: u64, high: u64,) -> Vec<Entry> { + pub fn entries(low: u64, high: u64) -> Vec<Entry> { let mut ret: Vec<Entry> = vec![]; let space = Space::find("raft_log").unwrap(); - let iter = space.primary_key().select(IteratorType::GE, &(low,)).unwrap(); + let iter = space + .primary_key() + .select(IteratorType::GE, &(low,)) + .unwrap(); for tuple in iter { let row: LogRow = tuple.into_struct().unwrap(); @@ -177,13 +180,12 @@ impl raft::Storage for Storage { let space = Space::find("raft_log").unwrap(); let tuple = space.primary_key().get(&(idx,)).unwrap(); - let row: Option<LogRow> = tuple - .and_then(|t| t.into_struct().unwrap()); + let row: Option<LogRow> = tuple.and_then(|t| t.into_struct().unwrap()); let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!()); if let Some(row) = row { debug!(logger, "+++ term(idx={}) -> {:?}", idx, row.raft_term); - return Ok(row.raft_term) + return Ok(row.raft_term); } else { debug!(logger, "+++ term(idx={}) -> Unavailable", idx); return Err(RaftError::Store(StorageError::Unavailable)); @@ -197,11 +199,8 @@ impl raft::Storage for Storage { fn last_index(&self) -> Result<u64, RaftError> { let space: Space = Space::find("raft_log").unwrap(); let tuple: Option<Tuple> = space.primary_key().max(&()).unwrap(); - let row: Option<LogRow> = tuple - .and_then(|t| t.into_struct().unwrap()); - let ret: u64 = row - .map(|row| row.raft_index) - .unwrap_or(0); + let row: Option<LogRow> = tuple.and_then(|t| t.into_struct().unwrap()); + let ret: u64 = row.map(|row| row.raft_index).unwrap_or(0); let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!()); debug!(logger, "+++ last_index() -> {:?}", ret); @@ -210,7 +209,10 @@ impl raft::Storage for Storage { fn snapshot(&self, request_index: u64) -> Result<Snapshot, RaftError> { let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!()); - debug!(logger, "+++ snapshot(idx={}) -> unimplemented", request_index); + debug!( + logger, + "+++ snapshot(idx={}) -> unimplemented", request_index + ); unimplemented!(); } } @@ -251,7 +253,6 @@ impl From<LogRow> for Entry { ret.set_data(row.data.into()); ret } - } impl ::tarantool::tuple::AsTuple for LogRow {}