Skip to content
Snippets Groups Projects
Commit 8d137310 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Georgy Moshkin
Browse files

feat: raft-sync in conf_change_loop

parent dbc29692
No related branches found
No related tags found
1 merge request!244attempt at topology governor
......@@ -456,6 +456,10 @@ impl Peer {
pub fn is_active(&self) -> bool {
matches!(self.grade, Grade::Online)
}
pub fn has_grades(&self, current: Grade, target: TargetGrade) -> bool {
self.grade == current && self.target_grade == target
}
}
impl std::fmt::Display for Peer {
......@@ -823,6 +827,12 @@ pub struct SyncRaftRequest {
}
impl Encode for SyncRaftRequest {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncRaftResponse {
pub commit: u64,
}
impl Encode for SyncRaftResponse {}
///////////////////////////////////////////////////////////////////////////////
/// Activity state of an instance.
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
......
......@@ -282,7 +282,7 @@ impl PoolWorker {
);
let convert_result = |bytes| {
let bytes: RawByteBuf = bytes?;
let res = Decode::decode(&bytes)?;
let ((res,),) = Decode::decode(&bytes)?;
Ok(res)
};
self.inbox.send((
......
......@@ -53,12 +53,13 @@ use crate::traft::Op;
use crate::traft::Topology;
use crate::traft::TopologyRequest;
use crate::traft::{
ExpelRequest, ExpelResponse, JoinRequest, JoinResponse, SyncRaftRequest, UpdatePeerRequest,
ExpelRequest, ExpelResponse, JoinRequest, JoinResponse, SyncRaftRequest, SyncRaftResponse,
UpdatePeerRequest,
};
use crate::traft::{PeerStorage, RaftSpaceAccess, Storage};
use super::Grade;
use super::OpResult;
use super::{Grade, TargetGrade};
type RawNode = raft::RawNode<RaftSpaceAccess>;
......@@ -993,6 +994,12 @@ fn raft_conf_change_loop(
storage: RaftSpaceAccess,
peer_storage: PeerStorage,
) {
let mut pool = ConnectionPool::builder(peer_storage.clone())
.call_timeout(Duration::from_secs(1))
.connect_timeout(Duration::from_millis(500))
.inactivity_timeout(Duration::from_secs(60))
.build();
loop {
if status.borrow().raft_state != "Leader" {
event::wait(Event::StatusChanged).expect("Events system must be initialized");
......@@ -1003,6 +1010,8 @@ fn raft_conf_change_loop(
let term = storage.term().unwrap().unwrap_or(0);
let node = global().expect("must be initialized");
////////////////////////////////////////////////////////////////////////
// conf change
if let Some(conf_change) = raft_conf_change(&storage, &peers) {
// main_loop gives the warranty that every ProposeConfChange
// will sometimes be handled and there's no need in timeout.
......@@ -1018,6 +1027,102 @@ fn raft_conf_change_loop(
continue;
}
////////////////////////////////////////////////////////////////////////
// offline
let to_offline = peers
.iter()
.filter(|peer| peer.grade != Grade::Offline)
.find(|peer| peer.target_grade == TargetGrade::Offline);
if let Some(peer) = to_offline {
let instance_id = peer.instance_id.clone();
let cluster_id = storage.cluster_id().unwrap().unwrap();
let req = UpdatePeerRequest::new(instance_id, cluster_id).with_grade(Grade::Offline);
let res = node.handle_topology_request_and_wait(req.into());
if let Err(e) = res {
tlog!(Warning, "failed to set peer offline: {e}";
"instance_id" => &*peer.instance_id,
);
}
continue;
}
////////////////////////////////////////////////////////////////////////
// raft sync
let to_sync = peers
.iter()
.find(|peer| peer.has_grades(Grade::Offline, TargetGrade::Online));
if let Some(peer) = to_sync {
let commit = storage.commit().unwrap().unwrap();
let (rx, tx) = fiber::Channel::new(1).into_clones();
pool.call(
&peer.raft_id,
SyncRaftRequest {
commit,
timeout: Duration::from_secs(10),
},
move |res| tx.send(res).expect("mustn't fail"),
)
.expect("shouldn't fail");
let res = rx.recv().expect("ought not fail");
let res = res.and_then(|SyncRaftResponse { commit }| {
// TODO: change `Info` to `Debug`
tlog!(Info, "peer synced";
"commit" => commit,
"instance_id" => &*peer.instance_id,
);
let cluster_id = storage.cluster_id().unwrap().unwrap();
let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
.with_grade(Grade::RaftSynced);
global()
.expect("can't be deinitialized")
.handle_topology_request_and_wait(req.into())
});
match res {
Ok(peer) => {
tlog!(Info, "raft sync processed");
debug_assert!(peer.grade == Grade::RaftSynced);
}
Err(e) => {
tlog!(Warning, "raft sync failed: {e}";
"instance_id" => &*peer.instance_id,
"peer" => &peer.peer_address,
);
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300))
.unwrap();
}
}
continue;
}
////////////////////////////////////////////////////////////////////////
// replication
// TODO
////////////////////////////////////////////////////////////////////////
// sharding
// TODO
let to_online = peers
.iter()
.find(|peer| peer.has_grades(Grade::RaftSynced, TargetGrade::Online));
if let Some(peer) = to_online {
let instance_id = peer.instance_id.clone();
let cluster_id = node.storage.cluster_id().unwrap().unwrap();
let req = UpdatePeerRequest::new(instance_id, cluster_id).with_grade(Grade::Online);
let res = node.handle_topology_request_and_wait(req.into());
if let Err(e) = res {
tlog!(Warning, "failed to set peer online: {e}";
"instance_id" => &*peer.instance_id,
);
}
continue;
}
event::wait(Event::TopologyChanged).expect("Events system must be initialized");
}
}
......@@ -1181,11 +1286,12 @@ fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::erro
// NetBox entrypoint. Run on any node.
#[proc(packed_args)]
fn raft_sync_raft(req: SyncRaftRequest) -> Result<(), Box<dyn std::error::Error>> {
fn raft_sync_raft(req: SyncRaftRequest) -> Result<SyncRaftResponse, Box<dyn std::error::Error>> {
let deadline = Instant::now() + req.timeout;
loop {
if global()?.storage.commit().unwrap().unwrap() >= req.commit {
return Ok(());
let commit = global()?.storage.commit().unwrap().unwrap();
if commit >= req.commit {
return Ok(SyncRaftResponse { commit });
}
let now = Instant::now();
......@@ -1196,47 +1302,3 @@ fn raft_sync_raft(req: SyncRaftRequest) -> Result<(), Box<dyn std::error::Error>
event::wait_timeout(Event::RaftEntryApplied, deadline - now)?;
}
}
// Run on Leader
fn call_raft_sync_raft(promotee: &Peer, commit: u64) -> Result<(), Box<dyn std::error::Error>> {
let fn_name = stringify_cfunc!(traft::node::raft_sync_raft);
let req = SyncRaftRequest {
commit,
timeout: Duration::from_secs(10),
};
match crate::tarantool::net_box_call(&promotee.peer_address, fn_name, &req, Duration::MAX) {
Ok::<(), _>(_) => Ok(()),
Err(e) => Err(Box::new(e)),
}
}
// Run on Leader by topology governor
fn sync_raft(promotee: &Peer) -> Result<(), Box<dyn std::error::Error>> {
let commit = global()?.storage.commit().unwrap().unwrap();
match call_raft_sync_raft(promotee, commit) {
Ok(_) => {
let node = global()?;
let leader_id = node.status().leader_id.ok_or("leader_id not found")?;
if node.raft_id() != leader_id {
return Err(Box::from("not a leader"));
}
let instance_id = promotee.instance_id.clone();
let cluster_id = node
.storage
.cluster_id()?
.ok_or("cluster_id is not set yet")?;
let req = UpdatePeerRequest::new(instance_id, cluster_id).with_grade(Grade::RaftSynced);
node.handle_topology_request_and_wait(req.into())?;
Ok(())
}
Err(e) => Err(e),
}
}
......@@ -2,6 +2,8 @@ use ::tarantool::tuple::{DecodeOwned, Encode};
use std::fmt::Debug;
use serde::de::DeserializeOwned;
/// Types implementing this trait represent an RPC's (remote procedure call)
/// arguments. This trait contains information about the request.
pub trait Request: Encode + DecodeOwned {
......@@ -9,7 +11,7 @@ pub trait Request: Encode + DecodeOwned {
const PROC_NAME: &'static str;
/// Describes data returned from a successful RPC request.
type Response: Encode + DecodeOwned + Debug + 'static;
type Response: Encode + DeserializeOwned + Debug + 'static;
}
impl Request for super::JoinRequest {
......@@ -29,5 +31,5 @@ impl Request for super::ExpelRequest {
impl Request for super::SyncRaftRequest {
const PROC_NAME: &'static str = crate::stringify_cfunc!(super::node::raft_sync_raft);
type Response = ();
type Response = super::SyncRaftResponse;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment