diff --git a/picodata/main.rs b/picodata/main.rs index 3b2a738de3c56687ccb1e4d6850a2ecf2af72d95..f499bf4101771f6292fafd00ee4200919fc5f19c 100644 --- a/picodata/main.rs +++ b/picodata/main.rs @@ -66,6 +66,8 @@ fn main_run(matches: &clap::ArgMatches) { envp.insert("PICODATA_PEER".into(), peer); } + envp.entry("PICODATA_PEER".into()) + .or_insert_with(|| "127.0.0.1:3301".into()); envp.entry("PICODATA_LISTEN".into()) .or_insert_with(|| "3301".into()); envp.entry("PICODATA_DATA_DIR".into()) diff --git a/picolib/lib.rs b/picolib/lib.rs index 286156be7cc95af3f60b90053b94c42856bacaaa..ea17e80d1d3732e5e236fe70aacfb188d9ec44fd 100644 --- a/picolib/lib.rs +++ b/picolib/lib.rs @@ -90,7 +90,8 @@ pub unsafe extern "C" fn luaopen_picolib(l: *mut std::ffi::c_void) -> c_int { function inspect() return {raft_log = box.space.raft_log:fselect()}, - {raft_state = box.space.raft_state:fselect()} + {raft_state = box.space.raft_state:fselect()}, + {raft_group = box.space.raft_group:fselect()} end "#, ) @@ -115,12 +116,11 @@ fn main_run() { ..Default::default() }; - std::env::var("PICODATA_DATA_DIR").ok().map(|v| { + if let Ok(v) = std::env::var("PICODATA_DATA_DIR") { std::fs::create_dir_all(&v).unwrap(); cfg.wal_dir = v.clone(); - cfg.memtx_dir = v.clone(); - Some(v) - }); + cfg.memtx_dir = v; + }; tarantool::set_cfg(&cfg); tarantool::eval( @@ -166,6 +166,19 @@ fn main_run() { } }; + if let Ok(peers) = std::env::var("PICODATA_PEER") { + // This is a temporary hack until fair joining is implemented + let peers: Vec<_> = peers + .split(',') + .enumerate() + .map(|(i, x)| traft::row::Peer { + raft_id: (i + 1) as u64, + uri: x.to_owned(), + }) + .collect(); + traft::Storage::persist_peers(&peers); + } + let raft_cfg = raft::Config { id: raft_id, applied: traft::Storage::applied().unwrap().unwrap_or_default(), diff --git a/picolib/traft.rs b/picolib/traft.rs index 7dabd3699efe35ef0071f26c4f476f469c22eff8..06cbf8b4e2f94263ee1acfdbf494514008bf8cfa 100644 --- a/picolib/traft.rs +++ b/picolib/traft.rs @@ -8,9 +8,11 @@ pub use storage::Storage; pub mod row { mod entry; mod message; + mod peer; pub use entry::Entry; pub use message::Message; + pub use peer::Peer; } #[derive(Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)] diff --git a/picolib/traft/row/peer.rs b/picolib/traft/row/peer.rs new file mode 100644 index 0000000000000000000000000000000000000000..276589889c108841f89259c8b20c7839c97a720f --- /dev/null +++ b/picolib/traft/row/peer.rs @@ -0,0 +1,5 @@ +#[derive(serde::Deserialize)] +pub struct Peer { + pub raft_id: u64, + pub uri: String, +} diff --git a/picolib/traft/storage.rs b/picolib/traft/storage.rs index d83fb181bcf473886c553371308a8de5dbc8a874..b58a21601b70f14d015d639e1bc1ce80523a3e2a 100644 --- a/picolib/traft/storage.rs +++ b/picolib/traft/storage.rs @@ -15,6 +15,7 @@ use crate::traft::row; pub struct Storage; +const RAFT_GROUP: &str = "raft_group"; const RAFT_STATE: &str = "raft_state"; const RAFT_LOG: &str = "raft_log"; @@ -69,6 +70,7 @@ impl Storage { is_local = true, format = { {name = 'raft_id', type = 'unsigned', is_nullable = false}, + {name = 'peer_uri', type = 'string', is_nullable = false}, -- {name = 'raft_role', type = 'string', is_nullable = false}, -- {name = 'instance_id', type = 'string', is_nullable = false}, -- {name = 'instance_uuid', type = 'string', is_nullable = false}, @@ -164,6 +166,14 @@ impl Storage { Ok(()) } + pub fn persist_peers(peers: &[row::Peer]) { + let mut space = Space::find(RAFT_GROUP).unwrap(); + space.truncate().unwrap(); + for peer in peers { + space.insert(&(peer.raft_id, &peer.uri)).unwrap(); + } + } + pub fn entries(low: u64, high: u64) -> Result<Vec<raft::Entry>, StorageError> { let mut ret: Vec<raft::Entry> = vec![]; let iter = Storage::space(RAFT_LOG)? @@ -192,6 +202,21 @@ impl Storage { Ok(()) } + pub fn conf_state() -> Result<raft::ConfState, StorageError> { + let mut ret = raft::ConfState::default(); + + let iter = Storage::space(RAFT_GROUP)? + .select(IteratorType::All, &()) + .map_err(box_err!())?; + + for tuple in iter { + let peer: row::Peer = tuple.into_struct().map_err(box_err!())?; + ret.mut_voters().push(peer.raft_id); + } + + Ok(ret) + } + pub fn hard_state() -> Result<raft::HardState, StorageError> { let mut ret = raft::HardState::default(); if let Some(term) = Storage::term()? { @@ -217,13 +242,9 @@ impl Storage { impl raft::Storage for Storage { fn initial_state(&self) -> Result<raft::RaftState, RaftError> { - let hs = Storage::hard_state()?; - // See also: https://github.com/etcd-io/etcd/blob/main/raft/raftpb/raft.pb.go - let cs = raft::ConfState { - voters: vec![1], - ..Default::default() - }; + let hs = Storage::hard_state()?; + let cs = Storage::conf_state()?; let ret = raft::RaftState::new(hs, cs); Ok(ret) diff --git a/tests/cli.rs b/tests/cli.rs index b3caf0862cf3e631f9c517bef50eeb7e72475460..8746da86119dc616246e3a10ccab2090c12ed2b4 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -19,6 +19,7 @@ fn positive() { error(('Assertion failed: %q ~= %q'):format(l, r), 2) end end + assert_eq(os.environ()['PICODATA_PEER'], "127.0.0.1:3301") assert_eq(os.environ()['PICODATA_LISTEN'], "3301") assert_eq(os.environ()['PICODATA_DATA_DIR'], ".") "#,