mod.rs 10.66 KiB
//! Compatibility layer between Tarantool and `raft-rs`.
mod error;
mod network;
pub mod node;
mod storage;
pub mod topology;
use ::raft::prelude as raft;
use ::tarantool::tuple::AsTuple;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use uuid::Uuid;
use protobuf::Message as _;
use protobuf::ProtobufEnum as _;
pub use network::ConnectionPool;
pub use storage::Storage;
pub use topology::Topology;
pub type RaftId = u64;
//////////////////////////////////////////////////////////////////////////////////////////
/// Timestamps for raft entries.
///
/// Logical clock provides a cheap and easy way for generating globally unique identifiers.
///
/// - `count` is a simple in-memory counter. It's cheap to increment because it's volatile.
/// - `gen` should be persisted upon LogicalClock initialization to ensure the uniqueness.
/// - `id` corresponds to `raft_id` of the instance (that is already unique across nodes).
#[derive(Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct LogicalClock {
id: u64,
gen: u64,
count: u64,
}
impl LogicalClock {
pub fn new(id: u64, gen: u64) -> Self {
Self { id, gen, count: 0 }
}
pub fn inc(&mut self) {
self.count += 1;
}
}
//////////////////////////////////////////////////////////////////////////////////////////
/// The operation on the raft state machine.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "kind")]
pub enum Op {
/// No operation.
Nop,
/// Print the message in tarantool log.
Info { msg: String },
/// Evaluate the code on every instance in cluster.
EvalLua { code: String },
}
//////////////////////////////////////////////////////////////////////////////////////////
/// Serializable struct representing a member of the raft group.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct Peer {
/// Used for identifying raft nodes.
/// Must be unique in the raft group.
pub raft_id: u64,
/// Inbound address used for communication with the node.
/// Not to be confused with listen address.
pub peer_address: String,
/// Reflects the role of the node in the raft group.
/// Non-voters are also called learners in terms of raft.
pub voter: bool,
pub instance_id: String,
pub replicaset_id: String,
pub instance_uuid: String,
pub replicaset_uuid: String,
/// `0` means it's not committed yet.
pub commit_index: u64,
}
impl AsTuple for Peer {}
impl Peer {}
//////////////////////////////////////////////////////////////////////////////////////////
/// Serializable representation of `raft::prelude::Entry`.
///
/// See correspondig definition in `raft-rs`:
/// - <https://github.com/tikv/raft-rs/blob/v0.6.0/proto/proto/eraftpb.proto#L23>
///
#[derive(Clone, Serialize, Deserialize, PartialEq)]
pub struct Entry {
/// ```
/// enum EntryType {
/// EntryNormal = 0;
/// EntryConfChange = 1;
/// EntryConfChangeV2 = 2;
/// }
/// ```
pub entry_type: i32,
pub index: u64,
pub term: u64,
/// Corresponding `entry.data`. Solely managed by `raft-rs`.
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
/// Corresponding `entry.payload`. Managed by the Picodata.
pub context: Option<EntryContext>,
}
impl std::fmt::Debug for Entry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Entry")
.field(
"entry_type",
&raft::EntryType::from_i32(self.entry_type).ok_or(self.entry_type),
)
.field("index", &self.index)
.field("term", &self.term)
.field("data", &self.data)
.field("context", &self.context)
.finish()
}
}
/// Raft entry payload specific to the Picodata.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum EntryContext {
Normal(EntryContextNormal),
ConfChange(EntryContextConfChange),
}
/// [`EntryContext`] of a normal entry.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct EntryContextNormal {
pub lc: LogicalClock,
pub op: Op,
}
/// [`EntryContext`] of a conf change entry, either `EntryConfChange` or `EntryConfChangeV2`
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct EntryContextConfChange {
pub peers: Vec<Peer>,
}
impl AsTuple for Entry {}
impl ContextCoercion for EntryContextNormal {}
impl ContextCoercion for EntryContextConfChange {}
impl Entry {
/// Returns the logical clock value if it's an `EntryNormal`.
fn lc(&self) -> Option<&LogicalClock> {
match &self.context {
Some(EntryContext::Normal(v)) => Some(&v.lc),
Some(EntryContext::ConfChange(_)) => None,
None => None,
}
}
/// Returns the contained `Op` if it's an `EntryNormal`.
fn op(&self) -> Option<&Op> {
match &self.context {
Some(EntryContext::Normal(v)) => Some(&v.op),
Some(EntryContext::ConfChange(_)) => None,
None => None,
}
}
/// Returns the iterator over contained `Vec<Peer>` if it's an `EntryConfChange`.
fn iter_peers(&self) -> std::slice::Iter<'_, Peer> {
match &self.context {
Some(EntryContext::ConfChange(v)) => v.peers.iter(),
_ => (&[]).iter(),
}
}
}
impl EntryContext {
fn from_bytes_normal(bytes: &[u8]) -> Result<Option<Self>, error::CoercionError> {
match EntryContextNormal::read_from_bytes(bytes)? {
Some(v) => Ok(Some(Self::Normal(v))),
None => Ok(None),
}
}
fn from_bytes_conf_change(bytes: &[u8]) -> Result<Option<Self>, error::CoercionError> {
match EntryContextConfChange::read_from_bytes(bytes)? {
Some(v) => Ok(Some(Self::ConfChange(v))),
None => Ok(None),
}
}
fn write_to_bytes(ctx: Option<&Self>) -> Vec<u8> {
match ctx {
None => vec![],
Some(Self::Normal(v)) => v.to_bytes(),
Some(Self::ConfChange(v)) => v.to_bytes(),
}
}
}
impl TryFrom<&raft::Entry> for self::Entry {
type Error = error::CoercionError;
fn try_from(e: &raft::Entry) -> Result<Self, Self::Error> {
let ret = Self {
entry_type: e.entry_type.value(),
index: e.index,
term: e.term,
data: Vec::from(e.get_data()),
context: match e.entry_type {
raft::EntryType::EntryNormal => EntryContext::from_bytes_normal(&e.context)?,
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
EntryContext::from_bytes_conf_change(&e.context)?
}
},
};
Ok(ret)
}
}
impl TryFrom<self::Entry> for raft::Entry {
type Error = error::CoercionError;
fn try_from(row: self::Entry) -> Result<raft::Entry, Self::Error> {
let ret = raft::Entry {
entry_type: raft::EntryType::from_i32(row.entry_type)
.ok_or(Self::Error::UnknownEntryType(row.entry_type))?,
index: row.index,
term: row.term,
data: row.data.into(),
context: EntryContext::write_to_bytes(row.context.as_ref()).into(),
..Default::default()
};
Ok(ret)
}
}
///////////////////////////////////////////////////////////////////////////////
/// A wrapper for `raft::prelude::Message` already serialized with a protobuf.
///
/// This struct is used for passing `raft::prelude::Message`
/// over Tarantool binary protocol (`net_box`).
#[derive(Clone, Deserialize, Serialize)]
struct MessagePb(#[serde(with = "serde_bytes")] Vec<u8>);
impl AsTuple for MessagePb {}
impl ::std::fmt::Debug for MessagePb {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
f.debug_struct("MessagePb").finish_non_exhaustive()
}
}
impl From<raft::Message> for self::MessagePb {
fn from(m: raft::Message) -> Self {
Self(m.write_to_bytes().expect("that's a bug"))
}
}
impl TryFrom<self::MessagePb> for raft::Message {
type Error = protobuf::ProtobufError;
fn try_from(pb: self::MessagePb) -> Result<raft::Message, Self::Error> {
let mut ret = raft::Message::default();
ret.merge_from_bytes(&pb.0)?;
Ok(ret)
}
}
///////////////////////////////////////////////////////////////////////////////
/// This trait allows converting `EntryContext` to / from `Vec<u8>`.
pub trait ContextCoercion: Serialize + DeserializeOwned {
fn read_from_bytes(bytes: &[u8]) -> Result<Option<Self>, error::CoercionError> {
match bytes {
bytes if bytes.is_empty() => Ok(None),
bytes => Ok(Some(rmp_serde::from_read_ref(bytes)?)),
}
}
fn write_to_bytes(ctx: Option<&Self>) -> Vec<u8> {
match ctx {
None => vec![],
Some(ctx) => rmp_serde::to_vec_named(ctx).unwrap(),
}
}
fn to_bytes(&self) -> Vec<u8> {
ContextCoercion::write_to_bytes(Some(self))
}
}
///////////////////////////////////////////////////////////////////////////////
/// Request to join the cluster.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct JoinRequest {
pub cluster_id: String,
pub instance_id: String,
pub replicaset_id: Option<String>,
pub advertise_address: String,
pub voter: bool,
}
impl AsTuple for JoinRequest {}
///////////////////////////////////////////////////////////////////////////////
/// Response to a JoinRequest
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct JoinResponse {
pub peer: Peer,
pub raft_group: Vec<Peer>,
pub box_replication: Vec<String>,
// TODO add later:
// Other parameters necessary for box.cfg()
// pub read_only: bool,
}
impl AsTuple for JoinResponse {}
///////////////////////////////////////////////////////////////////////////////
lazy_static::lazy_static! {
static ref NAMESPACE_INSTANCE_UUID: Uuid =
Uuid::new_v3(&Uuid::nil(), "INSTANCE_UUID".as_bytes());
static ref NAMESPACE_REPLICASET_UUID: Uuid =
Uuid::new_v3(&Uuid::nil(), "REPLICASET_UUID".as_bytes());
}
/// Generate UUID for an instance from `instance_id` (String).
/// Use Version-3 (MD5) UUID.
pub fn instance_uuid(instance_id: &str) -> String {
let uuid = Uuid::new_v3(&NAMESPACE_INSTANCE_UUID, instance_id.as_bytes());
uuid.hyphenated().to_string()
}
/// Generate UUID for a replicaset from `replicaset_id` (String).
/// Use Version-3 (MD5) UUID.
pub fn replicaset_uuid(replicaset_id: &str) -> String {
let uuid = Uuid::new_v3(&NAMESPACE_REPLICASET_UUID, replicaset_id.as_bytes());
uuid.hyphenated().to_string()
}