Newer
Older
fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
let mut pool = ConnectionPool::builder(storage.peers.clone())
.call_timeout(Duration::from_secs(1))
.connect_timeout(Duration::from_millis(500))
.inactivity_timeout(Duration::from_secs(60))
.build();
if !status.get().raft_state.is_leader() {
event::wait(Event::StatusChanged).expect("Events system must be initialized");
let leader_id = status.get().id;
let peers = storage.peers.all_peers().unwrap();
let term = status.get().term;
let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
let node = global().expect("must be initialized");
////////////////////////////////////////////////////////////////////////
// conf change
if let Some(conf_change) = raft_conf_change(&storage.raft, &peers) {
// main_loop gives the warranty that every ProposeConfChange
// will sometimes be handled and there's no need in timeout.
// It also guarantees that the notification will arrive only
// after the node leaves the joint state.
match node.propose_conf_change_and_wait(term, conf_change) {
Ok(()) => tlog!(Info, "conf_change processed"),
Err(e) => {
tlog!(Warning, "conf_change failed: {e}");
fiber::sleep(Duration::from_secs(1));
}
////////////////////////////////////////////////////////////////////////
// offline
let to_offline = peers
.iter()
.filter(|peer| peer.current_grade != CurrentGrade::Offline)
// TODO: process them all, not just the first one
.find(|peer| peer.target_grade == TargetGrade::Offline);
if let Some(peer) = to_offline {
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
let res = (|| -> Result<_, Error> {
let reqs = maybe_responding(&peers)
.filter(|peer| {
peer.current_grade == CurrentGrade::ShardingInitialized
|| peer.current_grade == CurrentGrade::Online
})
.map(|peer| {
(
peer.instance_id.clone(),
sharding::Request {
leader_and_term: LeaderWithTerm { leader_id, term },
..Default::default()
},
)
});
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
for (peer_iid, resp) in res {
let sharding::Response {} = resp?;
// TODO: change `Info` to `Debug`
tlog!(Info, "sharding reconfigured on peer";
"instance_id" => &*peer_iid,
);
}
Ok(())
})();
if let Err(e) = res {
tlog!(Warning, "failed to reconfigure sharding: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
}
let instance_id = peer.instance_id.clone();
let req = UpdatePeerRequest::new(instance_id, cluster_id.clone())
.with_current_grade(CurrentGrade::Offline);
let res = node.handle_topology_request_and_wait(req.into());
if let Err(e) = res {
tlog!(Warning, "failed to set peer offline: {e}";
"instance_id" => &*peer.instance_id,
);
}
}
////////////////////////////////////////////////////////////////////////
// raft sync
// TODO: putting each stage in a different function
// will make the control flow more readable
.find(|peer| peer.has_grades(CurrentGrade::Offline, TargetGrade::Online));
let commit = storage.raft.commit().unwrap().unwrap();
let (rx, tx) = fiber::Channel::new(1).into_clones();
pool.call(
&peer.raft_id,
SyncRaftRequest {
commit,
timeout: Duration::from_secs(10),
},
move |res| tx.send(res).expect("mustn't fail"),
)
.expect("shouldn't fail");
let res = rx.recv().expect("ought not fail");
let res = res.and_then(|SyncRaftResponse { commit }| {
// TODO: change `Info` to `Debug`
tlog!(Info, "peer synced";
"commit" => commit,
"instance_id" => &*peer.instance_id,
);
let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::RaftSynced);
global()
.expect("can't be deinitialized")
.handle_topology_request_and_wait(req.into())
});
match res {
Ok(peer) => {
tlog!(Info, "raft sync processed");
debug_assert!(peer.current_grade == CurrentGrade::RaftSynced);
}
Err(e) => {
tlog!(Warning, "raft sync failed: {e}";
"instance_id" => &*peer.instance_id,
"peer" => &peer.peer_address,
);
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300))
.unwrap();
}
}
}
////////////////////////////////////////////////////////////////////////
// replication
let to_replicate = peers
.iter()
// TODO: find all such peers in a given replicaset,
// not just the first one
.find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online));
if let Some(peer) = to_replicate {
let replicaset_id = &peer.replicaset_id;
let replicaset_iids = maybe_responding(&peers)
.filter(|peer| &peer.replicaset_id == replicaset_id)
.map(|peer| peer.instance_id.clone())
.collect::<Vec<_>>();
let replicaset_size = replicaset_iids.len();
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
let res = (|| -> Result<_, Error> {
let reqs = replicaset_iids
.iter()
.cloned()
.zip(repeat(replication::Request {
leader_and_term: LeaderWithTerm { leader_id, term },
replicaset_instances: replicaset_iids.clone(),
replicaset_id: replicaset_id.clone(),
// TODO: what if someone goes offline/expelled?
promote: replicaset_size == 1,
}));
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
for (peer_iid, resp) in res {
let replication::Response { lsn } = resp?;
// TODO: change `Info` to `Debug`
tlog!(Info, "configured replication with peer";
"instance_id" => &*peer_iid,
"lsn" => lsn,
);
let mut req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::Replicated);
if replicaset_size == 1 {
// TODO: ignore expelled peers
// TODO: ignore offline peers
req = req.with_is_master(true);
node.handle_topology_request_and_wait(req.into())?;
Ok(())
})();
if let Err(e) = res {
tlog!(Warning, "failed to configure replication: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
let replicaset_weight = storage
.state
.replicaset_weight(replicaset_id)
.expect("storage error");
if replicaset_weight.is_none() {
if let Err(e) = (|| -> Result<(), Error> {
let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
let weight = if vshard_bootstrapped { 0. } else { 1. };
let mut ops = UpdateOps::new();
ops.assign(format!("['value']['{replicaset_id}']"), weight)?;
let req = traft::OpDML::update(
ClusterSpace::State,
&[StateKey::ReplicasetWeights],
ops,
)?;
// TODO: don't hard code the timeout
node.propose_and_wait(req, Duration::from_secs(3))??;
Ok(())
})() {
// TODO: what if all replicas have changed their grade
// successfully, but the replicaset_weight failed to set?
tlog!(Warning, "failed to set replicaset weight: {e}";
"replicaset_id" => replicaset_id,
);
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
tlog!(Info, "configured replication"; "replicaset_id" => replicaset_id);
////////////////////////////////////////////////////////////////////////
// init sharding
let to_shard = peers
.find(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
if let Some(peer) = to_shard {
let res = (|| -> Result<(), Error> {
let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
let reqs = maybe_responding(&peers).map(|peer| {
(
peer.instance_id.clone(),
sharding::Request {
leader_and_term: LeaderWithTerm { leader_id, term },
bootstrap: !vshard_bootstrapped && peer.raft_id == leader_id,
..Default::default()
},
)
});
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
for (peer_iid, resp) in res {
let sharding::Response {} = resp?;
// TODO: change `Info` to `Debug`
tlog!(Info, "initialized sharding with peer";
"instance_id" => &*peer_iid,
);
let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::ShardingInitialized);
node.handle_topology_request_and_wait(req.into())?;
if !vshard_bootstrapped {
// TODO: if this fails, it will only rerun next time vshard
// gets reconfigured
node.propose_and_wait(
traft::OpDML::replace(
ClusterSpace::State,
&(StateKey::VshardBootstrapped, true),
)?,
// TODO: don't hard code the timeout
Duration::from_secs(3),
)??;
}
Ok(())
})();
if let Err(e) = res {
tlog!(Warning, "failed to initialize sharding: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
tlog!(Info, "sharding is initialized");
continue 'governor;
}
////////////////////////////////////////////////////////////////////////
let to_update_weights = peers
.find(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
if let Some(peer) = to_update_weights {
let res = if let Some(new_weights) =
get_new_weights(maybe_responding(&peers), &storage.state)
{
let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone());
let reqs = peer_ids.zip(repeat(sharding::Request {
leader_and_term: LeaderWithTerm { leader_id, term },
weights: Some(new_weights.clone()),
..Default::default()
}));
// TODO: don't hard code timeout
let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
resp?;
// TODO: change `Info` to `Debug`
tlog!(Info, "peer is online"; "instance_id" => &*peer_iid);
}
let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::Online);
node.handle_topology_request_and_wait(req.into())?;
// TODO: if this fails, it will only rerun next time vshard
// gets reconfigured
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
node.propose_and_wait(
// TODO: OpDML::update with just the changes
traft::OpDML::replace(
ClusterSpace::State,
&(StateKey::ReplicasetWeights, new_weights),
)?,
// TODO: don't hard code the timeout
Duration::from_secs(3),
)??;
Ok(())
})()
} else {
(|| -> Result<(), Error> {
let to_online = peers.iter().filter(|peer| {
peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)
});
for Peer { instance_id, .. } in to_online {
let cluster_id = cluster_id.clone();
let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id)
.with_current_grade(CurrentGrade::Online);
node.handle_topology_request_and_wait(req.into())?;
// TODO: change `Info` to `Debug`
tlog!(Info, "peer is online"; "instance_id" => &**instance_id);
}
Ok(())
})()
};
tlog!(Warning, "updating sharding weights failed: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
continue 'governor;
tlog!(Info, "sharding is configured");
event::wait(Event::TopologyChanged).expect("Events system must be initialized");
#[allow(clippy::type_complexity)]
fn call_all<R, I>(
pool: &mut ConnectionPool,
reqs: impl IntoIterator<Item = (I, R)>,
timeout: Duration,
) -> Result<Vec<(I, Result<R::Response, Error>)>, Error>
where
R: traft::rpc::Request,
I: traft::network::PeerId + Clone + std::fmt::Debug + 'static,
{
// TODO: this crap is only needed to wait until results of all
// the calls are ready. There are several ways to rafactor this:
// - we could use a std-style channel that unblocks the reading end
// once all the writing ends have dropped
// (fiber::Channel cannot do that for now)
// - using the std Futures we could use futures::join!
//
// Those things aren't implemented yet, so this is what we do
let reqs = reqs.into_iter().collect::<Vec<_>>();
static mut SENT_COUNT: usize = 0;
unsafe { SENT_COUNT = 0 };
let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones();
let peer_count = reqs.len();
let (rx, tx) = fiber::Channel::new(peer_count as _).into_clones();
for (id, req) in reqs {
let tx = tx.clone();
let cond_tx = cond_tx.clone();
let id_copy = id.clone();
pool.call(&id, req, move |res| {
tx.send((id_copy, res)).expect("mustn't fail");
unsafe { SENT_COUNT += 1 };
if unsafe { SENT_COUNT } == peer_count {
cond_tx.signal()
}
})
.expect("shouldn't fail");
}
// TODO: don't hard code timeout
if !cond_rx.wait_timeout(timeout) {
return Err(Error::Timeout);
}
Ok(rx.into_iter().take(peer_count).collect())
}
#[inline(always)]
fn get_new_weights<'p>(
peers: impl IntoIterator<Item = &'p Peer>,
state: &State,
) -> Option<ReplicasetWeights> {
let replication_factor = state.replication_factor().expect("storage error");
let mut replicaset_weights = state.replicaset_weights().expect("storage error");
let mut replicaset_sizes = HashMap::new();
let mut weights_changed = false;
for peer @ Peer { replicaset_id, .. } in peers {
if !peer.may_respond() {
continue;
}
let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0);
*replicaset_size += 1;
if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. {
weights_changed = true;
*replicaset_weights.get_mut(replicaset_id).unwrap() = 1.;
}
}
weights_changed.then_some(replicaset_weights)
}
#[inline(always)]
fn maybe_responding(peers: &[Peer]) -> impl Iterator<Item = &Peer> {
peers.iter().filter(|peer| peer.may_respond())
}
fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
let change_type = if is_voter {
raft::ConfChangeType::AddNode
} else {
raft::ConfChangeType::AddLearnerNode
};
raft::ConfChangeSingle {
change_type,
node_id,
..Default::default()
}
}
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
unsafe {
assert!(
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
RAFT_NODE = Some(Box::new(node));
pub fn global() -> Result<&'static Node, Error> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> {
node.step_and_yield(raft::Message::try_from(pb)?);
}
Ok(())
}
#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> {
let cluster_id = node
.storage
.cluster_id()?
.ok_or("cluster_id is not set yet")?;
if req.cluster_id != cluster_id {
return Err(Box::new(Error::ClusterIdMismatch {
instance_cluster_id: req.cluster_id,
cluster_cluster_id: cluster_id,
}));
}
let peer = node.handle_topology_request_and_wait(req.into())?;
let box_replication = node
.storage
.peers
.replicaset_peer_addresses(&peer.replicaset_id, Some(peer.commit_index))?;
// A joined peer needs to communicate with other nodes.
// Provide it the list of raft voters in response.
let mut raft_group = vec![];
for raft_id in node.storage.raft.voters()?.unwrap_or_default().into_iter() {
match node.storage.peers.get(&raft_id) {
Err(e) => {
crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e);
}
Ok(peer) => raft_group.push(peer),
Ok(JoinResponse {
peer,
raft_group,
box_replication,
})
pub fn expel_wrapper(instance_id: InstanceId) -> Result<(), traft::error::Error> {
match expel_by_instance_id(instance_id) {
Ok(ExpelResponse {}) => Ok(()),
Err(e) => Err(traft::error::Error::Other(e)),
}
}
fn expel_by_instance_id(
instance_id: InstanceId,
) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
let cluster_id = global()?
.storage
.cluster_id()?
.ok_or("cluster_id is not set yet")?;
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
expel(ExpelRequest {
instance_id,
cluster_id,
})
}
// NetBox entrypoint. Run on any node.
#[proc(packed_args)]
fn raft_expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
expel(req)
}
// Netbox entrypoint. For run on Leader only. Don't call directly, use `raft_expel` instead.
#[proc(packed_args)]
fn raft_expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
expel_on_leader(req)
}
fn expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
let node = global()?;
let leader_id = node.status().leader_id.ok_or("leader_id not found")?;
let leader = node.storage.peers.get(&leader_id).unwrap();
let leader_address = leader.peer_address;
let fn_name = stringify_cfunc!(traft::node::raft_expel_on_leader);
match crate::tarantool::net_box_call(&leader_address, fn_name, &req, Duration::MAX) {
Ok::<traft::ExpelResponse, _>(_resp) => Ok(ExpelResponse {}),
Err(e) => Err(Box::new(e)),
}
}
fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
let cluster_id = global()?
.storage
.cluster_id()?
.ok_or("cluster_id is not set yet")?;
if req.cluster_id != cluster_id {
return Err(Box::new(Error::ClusterIdMismatch {
instance_cluster_id: req.cluster_id,
cluster_cluster_id: cluster_id,
}));
}
let node = global()?;
let leader_id = node.status().leader_id.ok_or("leader_id not found")?;
if node.raft_id() != leader_id {
return Err(Box::from("not a leader"));
}
let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id)
.with_current_grade(CurrentGrade::Expelled);
node.handle_topology_request_and_wait(req2.into())?;
// NetBox entrypoint. Run on any node.
#[proc(packed_args)]
fn raft_sync_raft(req: SyncRaftRequest) -> Result<SyncRaftResponse, Box<dyn std::error::Error>> {
let deadline = Instant::now() + req.timeout;
loop {
let commit = global()?.storage.raft.commit().unwrap().unwrap();
if commit >= req.commit {
return Ok(SyncRaftResponse { commit });