Skip to content
Snippets Groups Projects
Verified Commit ac2a3c26 authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

feature: propose_wait_applied

parent 761f799c
No related branches found
No related tags found
No related merge requests found
Pipeline #3514 failed
......@@ -19,6 +19,7 @@ use message::Message;
use std::cell::Ref;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::time::Duration;
#[derive(Default)]
pub struct Stash {
......@@ -75,7 +76,12 @@ pub unsafe extern "C" fn luaopen_picolib(l: *mut std::ffi::c_void) -> c_int {
);
luamod.set(
"raft_propose_eval",
tlua::function1(|x: String| raft_propose(Message::EvalLua { code: x })),
tlua::function2(|timeout: f64, x: String| {
raft_propose_wait_applied(
Message::EvalLua { code: x },
Duration::from_secs_f64(timeout),
)
}),
);
{
l.exec(
......@@ -190,6 +196,16 @@ fn raft_propose(msg: Message) {
tlog!(Debug, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,");
}
fn raft_propose_wait_applied(msg: Message, timeout: Duration) -> bool {
let stash = Stash::access();
let raft_ref = stash.raft_node();
let raft_node = raft_ref.as_ref().expect("Picodata not running yet");
tlog!(Debug, "propose {:?} ................................", msg);
let res = raft_node.propose_wait_applied(&msg, timeout);
tlog!(Debug, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,");
res
}
fn handle_committed_data(data: &[u8]) {
use Message::*;
......
......@@ -2,6 +2,8 @@ mod node;
mod storage;
pub use node::Node;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
pub use storage::Storage;
pub mod row {
mod entry;
......@@ -10,3 +12,37 @@ pub mod row {
pub use entry::Entry;
pub use message::Message;
}
#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct LogicalClock {
id: u64,
gen: u64,
count: u64,
}
impl LogicalClock {
pub fn new() -> Self {
let id = Storage::id().unwrap().unwrap();
let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
Storage::persist_gen(gen).unwrap();
Self { id, gen, count: 0 }
}
pub fn inc(&mut self) {
self.count += 1;
}
}
impl TryFrom<&[u8]> for LogicalClock {
type Error = rmp_serde::decode::Error;
fn try_from(data: &[u8]) -> Result<Self, Self::Error> {
rmp_serde::from_read_ref(data)
}
}
impl From<&LogicalClock> for Vec<u8> {
fn from(lc: &LogicalClock) -> Vec<u8> {
rmp_serde::to_vec(lc).unwrap()
}
}
......@@ -2,14 +2,18 @@ use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::tarantool::fiber;
use ::tarantool::util::IntoClones;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::time::Duration;
use std::time::Instant;
use crate::tlog;
use crate::traft::LogicalClock;
use crate::traft::Storage;
type RawNode = raft::RawNode<Storage>;
type Notify = fiber::Channel<()>;
pub struct Node {
_main_loop: fiber::LuaUnitJoinHandle,
......@@ -18,7 +22,8 @@ pub struct Node {
#[derive(Clone, Debug)]
enum Request {
Propose(Vec<u8>),
Propose { data: Vec<u8> },
ProposeWaitApplied { data: Vec<u8>, notify: Notify },
Step(raft::Message),
}
......@@ -36,11 +41,38 @@ impl Node {
})
}
pub fn propose<T: Into<Vec<u8>>>(&self, data: T) {
let req = Request::Propose(data.into());
pub fn propose(&self, data: impl Into<Vec<u8>>) {
let req = Request::Propose { data: data.into() };
self.inbox.send(req).unwrap();
}
pub fn propose_wait_applied(&self, data: impl Into<Vec<u8>>, timeout: Duration) -> bool {
let (rx, tx) = fiber::Channel::new(1).into_clones();
let now = Instant::now();
let req = Request::ProposeWaitApplied {
data: data.into(),
notify: tx,
};
match self.inbox.send_timeout(req, timeout) {
Err(fiber::SendError::Disconnected(_)) => unreachable!(),
Err(fiber::SendError::Timeout(_)) => {
rx.close();
return false;
}
Ok(()) => (),
}
match rx.recv_timeout(timeout.saturating_sub(now.elapsed())) {
Err(_) => {
rx.close();
false
}
Ok(()) => true,
}
}
pub fn step(&self, msg: raft::Message) {
let req = Request::Step(msg);
self.inbox.send(req).unwrap();
......@@ -50,13 +82,32 @@ impl Node {
fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: fn(&[u8])) {
let mut next_tick = Instant::now() + Node::TICK;
let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new();
let mut lc = LogicalClock::new();
loop {
// Clean up obsolete notifications
notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
match inbox.recv_timeout(Node::TICK) {
Ok(Request::Propose(data)) => {
raw_node.propose(vec![], data).unwrap();
Ok(Request::Propose { data }) => {
if let Err(e) = raw_node.propose(vec![], data) {
tlog!(Error, "{e}");
}
}
Ok(Request::ProposeWaitApplied { data, notify }) => {
lc.inc();
if let Err(e) = raw_node.propose(Vec::from(&lc), data) {
tlog!(Error, "{e}");
notify.close();
} else {
notifications.insert(lc.clone(), notify);
}
}
Ok(Request::Step(msg)) => {
raw_node.step(msg).unwrap();
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
}
Err(fiber::RecvError::Timeout) => (),
Err(fiber::RecvError::Disconnected) => unreachable!(),
......@@ -91,12 +142,17 @@ fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: f
unimplemented!();
}
let handle_committed_entries = |committed_entries: Vec<raft::Entry>| {
let mut handle_committed_entries = |committed_entries: Vec<raft::Entry>| {
for entry in committed_entries {
Storage::persist_applied(entry.index).unwrap();
if entry.get_entry_type() == raft::EntryType::EntryNormal {
on_commit(entry.get_data())
on_commit(entry.get_data());
if let Ok(lc) = LogicalClock::try_from(entry.get_context()) {
if let Some(notify) = notifications.remove(&lc) {
notify.try_send(()).ok();
}
}
}
// TODO: handle EntryConfChange
......
......@@ -4,6 +4,7 @@ use serde::Serialize;
use std::convert::TryFrom;
use crate::error::CoercionError;
use crate::traft::LogicalClock;
use crate::Message;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
......@@ -11,7 +12,10 @@ pub struct Entry {
pub entry_type: String,
pub index: u64,
pub term: u64,
#[serde(default)]
pub msg: Message,
#[serde(default)]
pub ctx: Option<LogicalClock>,
}
impl ::tarantool::tuple::AsTuple for Entry {}
......@@ -24,6 +28,7 @@ impl TryFrom<raft::Entry> for self::Entry {
index: e.get_index(),
term: e.get_term(),
msg: Message::try_from(e.get_data())?,
ctx: LogicalClock::try_from(e.get_context()).ok(),
})
}
}
......@@ -47,6 +52,11 @@ impl TryFrom<self::Entry> for raft::Entry {
let bytes: Vec<u8> = Vec::from(&row.msg);
ret.set_data(bytes.into());
if let Some(ctx) = row.ctx {
let ctx: Vec<u8> = Vec::from(&ctx);
ret.set_context(ctx.into());
}
Ok(ret)
}
}
......@@ -85,12 +95,35 @@ inventory::submit!(crate::InnerTest {
assert_eq!(
ser(Entry::default()),
json!(["EntryNormal", 0u64, 0u64, ["empty"]])
json!(["EntryNormal", 0u64, 0u64, ["empty"], null])
);
assert_eq!(
ser(Entry::new(Message::Info { msg: "!".into() })),
json!(["EntryNormal", 0u64, 0u64, ["info", "!"]])
json!(["EntryNormal", 0u64, 0u64, ["info", "!"], null])
);
assert_eq!(
ser(Entry {
entry_type: "EntryNormal".into(),
index: 1001,
term: 1002,
msg: Message::EvalLua {
code: "return nil".into(),
},
ctx: Some(LogicalClock {
id: 1,
gen: 2,
count: 101
}),
}),
json!([
"EntryNormal",
1001u64,
1002u64,
["eval_lua", "return nil"],
[1, 2, 101],
])
);
assert_eq!(
......@@ -101,8 +134,15 @@ inventory::submit!(crate::InnerTest {
msg: Message::EvalLua {
code: "return nil".into(),
},
ctx: None,
}),
json!(["EntryNormal", 1001u64, 1002u64, ["eval_lua", "return nil"]])
json!([
"EntryNormal",
1001u64,
1002u64,
["eval_lua", "return nil"],
null,
])
);
let msg = Message::Info { msg: "?".into() };
......@@ -112,6 +152,7 @@ inventory::submit!(crate::InnerTest {
index: 99,
term: 2,
msg: msg.clone(),
ctx: None,
})
.expect("coercing raft::Entry from self::Entry failed"),
raft::Entry {
......
......@@ -42,6 +42,7 @@ impl Storage {
{name = 'index', type = 'unsigned', is_nullable = false},
{name = 'term', type = 'unsigned', is_nullable = false},
{name = 'msg', type = 'any', is_nullable = true},
{name = 'ctx', type = 'any', is_nullable = true},
}
})
box.space.raft_log:create_index('pk', {
......@@ -112,6 +113,11 @@ impl Storage {
Storage::raft_state("id")
}
/// Node generation i.e. the number of restarts.
pub fn gen() -> Result<Option<u64>, StorageError> {
Storage::raft_state("gen")
}
pub fn term() -> Result<Option<u64>, StorageError> {
Storage::raft_state("term")
}
......@@ -144,6 +150,10 @@ impl Storage {
Storage::persist_raft_state("vote", vote)
}
pub fn persist_gen(gen: u64) -> Result<(), StorageError> {
Storage::persist_raft_state("gen", gen)
}
pub fn persist_id(id: u64) -> Result<(), StorageError> {
Storage::space(RAFT_STATE)?
// We use `insert` instead of `replace` here
......@@ -300,9 +310,7 @@ inventory::submit!(crate::InnerTest {
let mut raft_log = Storage::space("raft_log").unwrap();
raft_log
.put(&("EntryUnknown", 99, 1, vec!["empty"]))
.unwrap();
raft_log.put(&("EntryUnknown", 99, 1)).unwrap();
assert_err!(
Storage.entries(1, 100, u64::MAX),
"unknown error unknown entry type \"EntryUnknown\""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment