Skip to content
Snippets Groups Projects
Commit 62b88a08 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Georgy Moshkin
Browse files

fix: check leader_id & term in replication & sharding stored procs

parent 4ee7d1e8
No related branches found
No related tags found
1 merge request!299Feat/poor mans vshard
use crate::traft::InstanceId;
use crate::traft::RaftId;
use crate::traft::{RaftId, RaftTerm};
use ::tarantool::tlua::LuaError;
use raft::StorageError;
use rmp_serde::decode::Error as RmpDecodeError;
......@@ -29,8 +29,13 @@ pub enum Error {
instance_rsid: String,
requested_rsid: String,
},
#[error("operation request from non leader {actual}, current leader is {expected}")]
LeaderIdMismatch { expected: RaftId, actual: RaftId },
#[error("operation request from non leader {requested}, current leader is {current}")]
LeaderIdMismatch { requested: RaftId, current: RaftId },
#[error("operation request from different term {requested}, current term is {current}")]
TermMismatch {
requested: RaftTerm,
current: RaftTerm,
},
#[error("error during execution of lua code: {0}")]
Lua(#[from] LuaError),
#[error("{0}")]
......
......@@ -50,6 +50,7 @@ use crate::traft::event::Event;
use crate::traft::failover;
use crate::traft::notify::Notify;
use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
use crate::traft::rpc::LeaderWithTerm;
use crate::traft::rpc::{replication, sharding};
use crate::traft::storage::{State, StateKey};
use crate::traft::ConnectionPool;
......@@ -1125,6 +1126,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
.iter()
.cloned()
.zip(repeat(replication::Request {
leader_and_term: LeaderWithTerm { leader_id, term },
replicaset_instances: replicaset_iids.clone(),
replicaset_id: replicaset_id.clone(),
// TODO: what if someone goes offline/expelled?
......@@ -1221,8 +1223,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
(
peer.instance_id.clone(),
sharding::Request {
leader_id,
term,
leader_and_term: LeaderWithTerm { leader_id, term },
bootstrap: !vshard_bootstrapped && peer.raft_id == leader_id,
..Default::default()
},
......@@ -1282,8 +1283,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
(|| -> Result<(), Error> {
let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone());
let reqs = peer_ids.zip(repeat(sharding::Request {
leader_id,
term,
leader_and_term: LeaderWithTerm { leader_id, term },
weights: Some(new_weights.clone()),
..Default::default()
}));
......
use crate::traft::error::Error;
use crate::traft::node::Status;
use crate::traft::{RaftId, RaftTerm};
use ::tarantool::tuple::{DecodeOwned, Encode};
use std::fmt::Debug;
......@@ -7,6 +11,33 @@ use serde::de::DeserializeOwned;
pub mod replication;
pub mod sharding;
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct LeaderWithTerm {
pub leader_id: RaftId,
pub term: RaftTerm,
}
impl LeaderWithTerm {
/// Check if requested `self.leader_id` and `self.term`
/// match the current ones from `status`.
pub fn check(&self, status: &Status) -> Result<(), Error> {
let status_leader_id = status.leader_id.ok_or(Error::LeaderUnknown)?;
if self.leader_id != status_leader_id {
return Err(Error::LeaderIdMismatch {
requested: self.leader_id,
current: status_leader_id,
});
}
if self.term != status.term {
return Err(Error::TermMismatch {
requested: self.term,
current: status.term,
});
}
Ok(())
}
}
/// Types implementing this trait represent an RPC's (remote procedure call)
/// arguments. This trait contains information about the request.
pub trait Request: Encode + DecodeOwned {
......
......@@ -11,6 +11,7 @@ use crate::InstanceId;
#[proc(packed_args)]
fn proc_replication(req: Request) -> Result<Response, Error> {
let node = node::global()?;
req.leader_and_term.check(&node.status())?;
let peer_storage = &node.storage.peers;
let this_rsid = peer_storage.peer_field::<ReplicasetId>(&node.raft_id())?;
let mut peer_addresses = Vec::with_capacity(req.replicaset_instances.len());
......@@ -37,6 +38,7 @@ fn proc_replication(req: Request) -> Result<Response, Error> {
/// Request to configure tarantool replication.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Request {
pub leader_and_term: super::LeaderWithTerm,
pub replicaset_instances: Vec<InstanceId>,
pub replicaset_id: String,
pub promote: bool,
......
use ::tarantool::{proc, tlua};
use crate::traft::{error::Error, node, RaftId, RaftTerm};
use crate::traft::{error::Error, node};
#[proc(packed_args)]
fn proc_sharding(req: Request) -> Result<Response, Error> {
let node = node::global()?;
let leader_id = node.status().leader_id.ok_or(Error::LeaderUnknown)?;
if req.leader_id != leader_id {
return Err(Error::LeaderIdMismatch {
expected: leader_id,
actual: req.leader_id,
});
}
// TODO: check term matches
let _ = req.term;
req.leader_and_term.check(&node.status())?;
let storage = &node.storage;
let cfg = if let Some(weights) = req.weights {
......@@ -46,8 +38,7 @@ fn proc_sharding(req: Request) -> Result<Response, Error> {
/// Request to configure vshard.
#[derive(Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
pub struct Request {
pub leader_id: RaftId,
pub term: RaftTerm,
pub leader_and_term: super::LeaderWithTerm,
pub weights: Option<cfg::ReplicasetWeights>,
pub bootstrap: bool,
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment