Newer
Older
"failed creating index '{}': {e}",
primary_key_def.name
);
}
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
match distribution {
Distribution::Global => {
// Nothing else is needed
}
Distribution::ShardedByField { .. } => {
todo!()
}
Distribution::ShardedImplicitly { .. } => {
// TODO: if primary key is not the first field or
// there's some space between key parts, we want
// bucket_id to go closer to the beginning of the tuple,
// but this will require to update primary key part
// indexes, so somebody should do that at some point.
let bucket_id_index = last_pk_part_index + 1;
format.insert(bucket_id_index as _, ("bucket_id", SFT::Unsigned).into());
let bucket_id_def = IndexDef {
id: 1,
name: "bucket_id".into(),
space_id: id,
schema_version,
parts: vec![Part::field(bucket_id_index)
.field_type(IFT::Unsigned)
.is_nullable(false)],
operable: false,
unique: false,
// TODO: support other cases
local: true,
};
let res = self.storage.indexes.insert(&bucket_id_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(
Warning,
"failed creating index '{}': {e}",
bucket_id_def.name
);
}
}
}
let space_def = SpaceDef {
id,
name,
distribution,
schema_version,
format,
operable: false,
};
let res = self.storage.spaces.insert(&space_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(Warning, "failed creating space '{}': {e}", space_def.name);
}
Ddl::CreateIndex {
space_id,
index_id,
by_fields,
} => {
let _ = (space_id, index_id, by_fields);
todo!();
}
Ddl::DropSpace { id } => {
ddl_meta_space_update_operable(&self.storage, id, false)
.expect("storage shouldn't fail");
Ddl::DropIndex { index_id, space_id } => {
let _ = (index_id, space_id);
todo!();
}
}
self.storage
.properties
.put(PropertyName::PendingSchemaChange, &ddl)?;
self.storage
.properties
.put(PropertyName::PendingSchemaVersion, &schema_version)?;

Georgy Moshkin
committed
self.storage
.properties
.put(PropertyName::NextSchemaVersion, &(schema_version + 1))?;
/// Is called during a transaction
fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
let mut latch_unlock = || {
if let Some(notify) = self.joint_state_latch.take() {
let _ = notify.send(Ok(()));
event::broadcast(Event::JointStateLeave);
}
};
// Beware: a tiny difference in type names (`V2` or not `V2`)
// makes a significant difference in `entry.data` binary layout and
// in joint state transitions.
// `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be
// applied in an instant without entering the joint state.
let conf_state = match entry.entry_type {
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
latch_unlock();
self.raw_node.apply_conf_change(&cc).unwrap()
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch when either of conditions is met:
// - conf_change will leave the joint state;
// - or it will be applied without even entering one.
let leave_joint = cc.leave_joint() || cc.enter_joint().is_none();
if leave_joint {
latch_unlock();
}
// ConfChangeTransition::Auto implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
self.raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
};
self.raft_storage.persist_conf_state(&conf_state).unwrap();
}
/// Is called during a transaction
fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
for rs in read_states {
if rs.request_ctx.is_empty() {
continue;
}
let ctx = crate::unwrap_ok_or!(
traft::EntryContextNormal::from_bytes(&rs.request_ctx),
tlog!(Error, "abnormal read_state: {e}"; "read_state" => ?rs);
continue;
}
if let Some(notify) = self.notifications.remove(&ctx.lc) {
notify.notify_ok(rs.index);
}
}
}
/// Is called during a transaction
fn handle_messages(&mut self, messages: Vec<raft::Message>) {
if messages.is_empty() {
return;
}
self.main_loop_status("sending raft messages");
let mut sent_count = 0;
let mut skip_count = 0;
let instance_reachability = self.instance_reachability.borrow();
for msg in messages {
if msg.msg_type == raft::MessageType::MsgHeartbeat
&& !instance_reachability.should_send_heartbeat_this_tick(msg.to)
{
continue;
}
if let Err(e) = self.pool.send(msg) {
tlog!(Error, "{e}");
}
}
tlog!(
Debug,
"done sending messages, sent: {sent_count}, skipped: {skip_count}"
);
}
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
fn fetch_chunkwise_snapshot(
&self,
snapshot_data: &mut SnapshotData,
entry_id: RaftEntryId,
) -> traft::Result<()> {
#[rustfmt::skip]
let mut position = snapshot_data.next_chunk_position
.expect("shouldn't be None if this function is called");
let space_dumps = &mut snapshot_data.space_dumps;
#[cfg(debug_assertions)]
let mut last_space_id = 0;
#[cfg(debug_assertions)]
let mut last_space_tuple_count = 0;
loop {
self.main_loop_status("receiving snapshot");
let Some(leader_id) = self.status.get().leader_id else {
tlog!(Warning, "leader id is unknown while trying to request next snapshot chunk");
return Err(Error::LeaderUnknown);
};
#[cfg(debug_assertions)]
{
let last = space_dumps.last().expect("should not be empty");
if last.space_id != last_space_id {
last_space_tuple_count = 0;
}
last_space_id = last.space_id;
let mut tuples = last.tuples.as_ref();
let count = rmp::decode::read_array_len(&mut tuples)
.expect("space dump should contain a msgpack array");
last_space_tuple_count += count;
assert_eq!(last_space_id, position.space_id);
assert_eq!(last_space_tuple_count, position.tuple_offset);
}
let req = rpc::snapshot::Request { entry_id, position };
const SNAPSHOT_CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
tlog!(Debug, "requesting next snapshot chunk";
"entry_id" => %entry_id,
"position" => %position,
);
let fut = self
.pool
.call(&leader_id, &req, SNAPSHOT_CHUNK_REQUEST_TIMEOUT);
let fut = unwrap_ok_or!(fut,
Err(e) => {
tlog!(Warning, "failed requesting next snapshot chunk: {e}");
self.main_loop_status("error when receiving snapshot");
fiber::sleep(MainLoop::TICK * 4);
continue;
}
);
let resp = fiber::block_on(fut);
let mut resp = unwrap_ok_or!(resp,
Err(e) => {
let msg = e.to_string();
if msg.contains("read view not available") {
tlog!(Warning, "aborting snapshot retrieval: {e}");
return Err(e);
}
tlog!(Warning, "failed requesting next snapshot chunk: {e}");
self.main_loop_status("error when receiving snapshot");
fiber::sleep(MainLoop::TICK * 4);
continue;
}
);
space_dumps.append(&mut resp.snapshot_data.space_dumps);
position = unwrap_some_or!(resp.snapshot_data.next_chunk_position, {
tlog!(Debug, "received final snapshot chunk");
break;
});
}
Ok(())
}
#[inline(always)]
fn main_loop_status(&self, status: &'static str) {
tlog!(Debug, "main_loop_status = '{status}'");
self.status
.send_modify(|s| s.main_loop_status = status)
.expect("status shouldn't ever be borrowed across yields");
/// Processes a so-called "ready state" of the [`raft::RawNode`].
///
/// This includes:
/// - Sending messages to other instances (raft nodes);
/// - Applying committed entries;
/// - Persisting uncommitted entries;
/// - Persisting hard state (term, vote, commit);
/// - Notifying pending fibers;
///
/// See also:
///
/// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
/// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
///
/// This function yields.
fn advance(&mut self, wake_governor: &mut bool, expelled: &mut bool) {
// Handle any unreachable nodes from previous iteration.
let unreachables = self
.instance_reachability
.borrow_mut()
.take_unreachables_to_report();
for raft_id in unreachables {
self.raw_node.report_unreachable(raft_id);
// TODO: remove infos when instances are expelled.
let Some(pr) = self.raw_node.raft.mut_prs().get_mut(raft_id) else {
continue;
};
// NOTE: Raft-rs will not check if the node should be paused until
// a new raft entry is appended to the log. This means that once an
// instance goes silent it will still be bombarded with heartbeats
// until someone proposes an operation. This is a workaround for
// that particular case.
// The istance's state would've only changed if it was not in the
// Snapshot state, so we have to check for that.
if pr.state == ::raft::ProgressState::Probe {
pr.pause();
}
}
// Get the `Ready` with `RawNode::ready` interface.
if !self.raw_node.has_ready() {
return;
}
let mut ready: raft::Ready = self.raw_node.ready();
// Apply soft state changes before anything else, so that this info is
// awailable for other fibers as soon as main loop yields.
if let Some(ss) = ready.ss() {
self.status
.send_modify(|s| {
s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
s.raft_state = ss.raft_state.into();
})
.expect("status shouldn't ever be borrowed across yields");
}
// Send out messages to the other nodes.
self.handle_messages(ready.take_messages());
// Handle read states before applying snapshot which may fail.
self.handle_read_states(ready.read_states());
// This is a snapshot, we need to apply the snapshot at first.
let snapshot = ready.snapshot();
let snapshot_data = (|| -> Option<SnapshotData> {
if snapshot.is_empty() {
return None;
}
let snapshot_data = crate::unwrap_ok_or!(
SnapshotData::decode(snapshot.get_data()),
Err(e) => {
tlog!(Warning, "skipping snapshot, which failed to deserialize: {e}");
return None;
}
);
let v_snapshot = snapshot_data.schema_version;
loop {
let v_local = local_schema_version().expect("storage souldn't fail");
let v_global = self
.storage
.properties
.global_schema_version()
.expect("storage shouldn't fail");
assert!(
v_global <= v_local,
"global schema version is only ever increased after local"
assert!(
v_global <= v_snapshot,
"global schema version updates are distributed via raft"
);
if v_local > v_snapshot {
tlog!(
Warning,
"skipping stale snapshot: local schema version: {}, snapshot schema version: {}",
v_local,
snapshot_data.schema_version,
);
return None;
if !self.is_readonly() {
// Replicaset leader applies the schema changes directly.
return Some(snapshot_data);
}
if v_local == v_snapshot {
// Replicaset follower has synced schema with the leader,
// now global space dumps should be handled.
return Some(snapshot_data);
}
self.main_loop_status("awaiting replication");
// Replicaset follower needs to sync with leader via tarantool
// replication.
let timeout = MainLoop::TICK * 4;
fiber::sleep(timeout);
if let Some(mut snapshot_data) = snapshot_data {
if snapshot_data.next_chunk_position.is_some() {
self.main_loop_status("receiving snapshot");
let entry_id = RaftEntryId {
index: snapshot.get_metadata().index,
term: snapshot.get_metadata().term,
};
if let Err(e) = self.fetch_chunkwise_snapshot(&mut snapshot_data, entry_id) {
// Error has been logged.
_ = e;
tlog!(Warning, "dropping snapshot data");
return;
}
}
self.main_loop_status("applying snapshot");
if let Err(e) = transaction(|| -> traft::Result<()> {
let meta = snapshot.get_metadata();
self.raft_storage.handle_snapshot_metadata(meta)?;
// FIXME: apply_snapshot_data calls truncate on clusterwide
// spaces and even though they're all local spaces doing
// truncate on them is not allowed on read_only instances.
// Related issue in tarantool:
// https://github.com/tarantool/tarantool/issues/5616
let is_readonly = self.is_readonly();
if is_readonly {
crate::tarantool::eval("box.cfg { read_only = false }")?;
}
let res = self
.storage
.apply_snapshot_data(&snapshot_data, !is_readonly);
if is_readonly {
crate::tarantool::exec("box.cfg { read_only = true }")?;
}
#[allow(clippy::let_unit_value)]
let _ = res?;
// TODO: As long as the snapshot was sent to us in response to
// a rejected MsgAppend (which is the only possible case
// currently), we will send a MsgAppendResponse back which will
// automatically reset our status from Snapshot to Replicate.
// But when we implement support for manual snapshot requests,
// we will have to also implement sending a MsgSnapStatus,
// to reset out status explicitly to avoid leader ignoring us
// indefinitely after that point.
Ok(())
}) {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
// Apply committed entries.
let res = self.handle_committed_entries(ready.committed_entries(), wake_governor, expelled);
if let Err(e) = res {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
if let Err(e) = transaction(|| -> Result<(), &str> {
if !ready.entries().is_empty() || ready.hs().is_some() {
self.main_loop_status("persisting entries");
}
// Persist uncommitted entries in the raft log.
self.raft_storage.persist_entries(ready.entries()).unwrap();
// Raft HardState changed, and we need to persist it.
if let Some(hs) = ready.hs() {
self.raft_storage.persist_hard_state(hs).unwrap();
self.status
.send_modify(|s| s.term = hs.term)
.expect("status shouldn't ever be borrowed across yields");
tlog!(Warning, "dropping raft ready: {ready:#?}");
// This bunch of messages is special. It must be sent only
// AFTER the HardState, Entries and Snapshot are persisted
// to the stable storage.
self.handle_messages(ready.take_persisted_messages());
// Advance the Raft.
let mut light_rd = self.raw_node.advance(ready);
// Send out messages to the other nodes.
self.handle_messages(light_rd.take_messages());
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
self.raft_storage.persist_commit(commit).unwrap();
}
// Apply committed entries.
let res =
self.handle_committed_entries(light_rd.committed_entries(), wake_governor, expelled);
if let Err(e) = res {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
// Advance the apply index.
self.raw_node.advance_apply();
self.main_loop_status("idle");
fn check_vclock_and_sleep(&mut self) -> traft::Result<()> {
assert!(self.raw_node.raft.state != RaftStateRole::Leader);
let my_id = self.raw_node.raft.id;
let my_instance_info = self.storage.instances.get(&my_id)?;
let replicaset_id = my_instance_info.replicaset_id;
let replicaset = self.storage.replicasets.get(&replicaset_id)?;
let replicaset = replicaset.ok_or_else(|| {
Error::other(format!("replicaset info for id {replicaset_id} not found"))
})?;
if replicaset.master_id == my_instance_info.instance_id {
return Err(Error::other(
"check_vclock_and_sleep called on replicaset master",
));
let master = self.storage.instances.get(&replicaset.master_id)?;
let master_vclock = fiber::block_on(sync::call_get_vclock(&self.pool, &master.raft_id))?;
let local_vclock = Vclock::current();
if matches!(
local_vclock.partial_cmp(&master_vclock),
None | Some(Ordering::Less)
) {
tlog!(Info, "blocking raft loop until replication progresses";
"master_vclock" => ?master_vclock,
"local_vclock" => ?local_vclock,
);
fiber::sleep(MainLoop::TICK * 4);
}
/// Check if this is a read only replica. This function is called when we
/// need to determine if this instance should be changing the schema
/// definition or if it should instead synchronize with a master.
///
/// Note: it would be a little more reliable to check if the replica is
/// chosen to be a master by checking master_id in _pico_replicaset, but
/// currently we cannot do that, because tarantool replication is being
/// done asynchronously with raft log replication. Basically instance needs
/// to know it's a replicaset master before it can access the replicaset
/// info.
fn is_readonly(&self) -> bool {
let is_ro: bool = crate::tarantool::eval("return box.info.ro")
.expect("checking read-onlyness should never fail");
is_ro
#[inline]
fn cleanup_notifications(&mut self) {
self.notifications.retain(|_, notify| !notify.is_closed());
}
/// Generates a pair of logical clock and a notification channel.
/// Logical clock is a unique identifier suitable for tagging
/// entries in raft log. Notification is broadcasted when the
/// corresponding entry is committed.
#[inline]
fn schedule_notification(&mut self) -> (LogicalClock, Notify) {
let (tx, rx) = notification();
let lc = {
self.lc.inc();
self.lc
};
self.notifications.insert(lc, tx);
(lc, rx)
}
/// Return value of [`NodeImpl::handle_committed_normal_entry`], explains what should be
/// done as result of attempting to apply a given entry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ApplyEntryResult {
/// This entry failed to apply for some reason, and must be retried later.
SleepAndRetry,
/// Entry applied successfully, proceed to next entry.
EntryApplied,
}
pub(crate) struct MainLoop {
_loop: Option<fiber::JoinHandle<'static, ()>>,
loop_waker: watch::Sender<()>,
stop_flag: Rc<Cell<bool>>,
}
struct MainLoopState {
loop_waker: watch::Receiver<()>,
stop_flag: Rc<Cell<bool>>,
}
impl MainLoop {
pub const TICK: Duration = Duration::from_millis(100);
fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self {
let (loop_waker_tx, loop_waker_rx) = watch::channel(());
let stop_flag: Rc<Cell<bool>> = Default::default();
let state = MainLoopState {
node_impl,
loop_waker: loop_waker_rx,
stop_flag: stop_flag.clone(),
};
Self {
// implicit yield
_loop: loop_start!("raft_main_loop", Self::iter_fn, state),
loop_waker: loop_waker_tx,
stop_flag,
}
}
pub fn wakeup(&self) {
let _ = self.loop_waker.send(());
async fn iter_fn(state: &mut MainLoopState) -> FlowControl {
let _ = state.loop_waker.changed().timeout(Self::TICK).await;
if state.stop_flag.take() {
return FlowControl::Break;
}
// FIXME: potential deadlock - can't use sync mutex in async fn
let mut node_impl = state.node_impl.lock(); // yields
if state.stop_flag.take() {
return FlowControl::Break;
}
node_impl.cleanup_notifications();
state.next_tick = now.saturating_add(Self::TICK);
let mut wake_governor = false;
let mut expelled = false;
node_impl.advance(&mut wake_governor, &mut expelled); // yields
drop(node_impl);
if state.stop_flag.take() {
return FlowControl::Break;
}
if expelled {
crate::tarantool::exit(0);
}
if wake_governor {
if let Err(e) = async { global()?.governor_loop.wakeup() }.await {
tlog!(Warning, "failed waking up governor: {e}");
}
FlowControl::Continue
}
}
impl Drop for MainLoop {
fn drop(&mut self) {
self.stop_flag.set(true);
let _ = self.loop_waker.send(());
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
unsafe {
assert!(
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
RAFT_NODE = Some(Box::new(node));
pub fn global() -> traft::Result<&'static Node> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> {
node.step_and_yield(raft::Message::try_from(pb).map_err(Error::other)?);