Newer
Older
// This condition means, schema versions must always increase
// even after an DdlAbort
if v_local == v_pending {
if self.is_readonly() {
} else {
let v_global = storage_properties
.global_schema_version()
.expect("storage should not fail");
ddl_abort_on_master(&ddl, v_global).expect("storage should not fail");
Ddl::CreateTable { id, .. } => {
ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail");
Ddl::DropTable { id, .. } => {
ddl_meta_space_update_operable(&self.storage, id, true)
.expect("storage shouldn't fail");
_ => {
todo!()
}
}
storage_properties
.delete(PropertyName::PendingSchemaChange)
.expect("storage should not fail");
storage_properties
.delete(PropertyName::PendingSchemaVersion)
.expect("storage should not fail");
let v_local = local_schema_version().expect("storage shoudl not fail");
let v_pending = acl.schema_version();
if v_local < v_pending {
if self.is_readonly() {
// Wait for tarantool replication with master to progress.
return SleepAndRetry;
} else {
match &acl {
Acl::CreateUser { user_def } => {
acl::on_master_create_user(user_def)
.expect("creating user shouldn't fail");
}
acl::on_master_change_user_auth(*user_id, auth)
.expect("changing user auth shouldn't fail");
}
Acl::DropUser { user_id, .. } => {
acl::on_master_drop_user(*user_id)
Acl::CreateRole { role_def } => {
acl::on_master_create_role(role_def)
.expect("creating role shouldn't fail");
}
Acl::DropRole { role_id, .. } => {
acl::on_master_drop_role(*role_id)
.expect("droping role shouldn't fail");
}
Acl::GrantPrivilege { priv_def } => {
acl::on_master_grant_privilege(priv_def)
.expect("granting a privilege shouldn't fail");
}
Acl::RevokePrivilege { priv_def, .. } => {
acl::on_master_revoke_privilege(priv_def)
.expect("revoking a privilege shouldn't fail");
}
}
set_local_schema_version(v_pending).expect("storage should not fail");
match &acl {
Acl::CreateUser { user_def } => {
acl::global_create_user(&self.storage, user_def)
.expect("persisting a user definition shouldn't fail");
}
Acl::ChangeAuth {
user_id,
auth,
initiator,
..
} => {
acl::global_change_user_auth(&self.storage, *user_id, auth, *initiator)
.expect("changing user definition shouldn't fail");
}
Acl::DropUser {
user_id, initiator, ..
} => {
acl::global_drop_user(&self.storage, *user_id, *initiator)
.expect("droping a user definition shouldn't fail");
}
Acl::CreateRole { role_def } => {
acl::global_create_role(&self.storage, role_def)
.expect("persisting a role definition shouldn't fail");
}
Acl::DropRole {
role_id, initiator, ..
} => {
acl::global_drop_role(&self.storage, *role_id, *initiator)
.expect("droping a role definition shouldn't fail");
}
Acl::GrantPrivilege { priv_def } => {
acl::global_grant_privilege(&self.storage, priv_def)
.expect("persiting a privilege definition shouldn't fail");
}
Acl::RevokePrivilege {
priv_def,
initiator,
} => {
acl::global_revoke_privilege(&self.storage, priv_def, *initiator)
.expect("removing a privilege definition shouldn't fail");
}
}
storage_properties
.put(PropertyName::GlobalSchemaVersion, &v_pending)
.expect("storage should not fail");
storage_properties
.put(PropertyName::NextSchemaVersion, &(v_pending + 1))
.expect("storage should not fail");
if let Some(lc) = &lc {
if let Some(notify) = self.notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(notify) = self.joint_state_latch.take_or_keep(&index) {
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("rolled back".into());
let _ = notify.send(Err(e));
EntryApplied
fn apply_op_ddl_prepare(&self, ddl: Ddl, schema_version: u64) -> traft::Result<()> {
debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
match ddl.clone() {
Ddl::CreateTable {
id,
name,
mut format,
mut primary_key,
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
use ::tarantool::util::NumOrStr::*;
let mut last_pk_part_index = 0;
for pk_part in &mut primary_key {
let (index, field) = match &pk_part.field {
Num(index) => {
if *index as usize >= format.len() {
// Ddl prepare operations should be verified before being proposed,
// so this shouldn't ever happen. But ignoring this is safe anyway,
// because proc_apply_schema_change will catch the error and ddl will be aborted.
tlog!(
Warning,
"invalid primary key part: field index {index} is out of bound"
);
continue;
}
(*index, &format[*index as usize])
}
Str(name) => {
let field_index = format.iter().zip(0..).find(|(f, _)| f.name == *name);
let Some((field, index)) = field_index else {
// Ddl prepare operations should be verified before being proposed,
// so this shouldn't ever happen. But ignoring this is safe anyway,
// because proc_apply_schema_change will catch the error and ddl will be aborted.
tlog!(
Warning,
"invalid primary key part: field '{name}' not found"
);
continue;
};
// We store all index parts as field indexes.
pk_part.field = Num(index);
(index, field)
}
};
let Some(field_type) =
crate::schema::try_space_field_type_to_index_field_type(field.field_type)
else {
// Ddl prepare operations should be verified before being proposed,
// so this shouldn't ever happen. But ignoring this is safe anyway,
// because proc_apply_schema_change will catch the error and ddl will be aborted.
tlog!(
Warning,
"invalid primary key part: field type {} cannot be part of an index",
field.field_type
);
continue;
};
// We overwrite the one provided in the request because
// there's no reason for it to be there, we know the type
// right here.
pk_part.r#type = Some(field_type);
pk_part.is_nullable = Some(field.is_nullable);
last_pk_part_index = last_pk_part_index.max(index);
}
let primary_key_def = IndexDef {
id: 0,
name: "primary_key".into(),
schema_version,
parts: primary_key,
operable: false,
// TODO: support other cases
local: true,
};
let res = self.storage.indexes.insert(&primary_key_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(
Warning,
"failed creating index '{}': {e}",
primary_key_def.name
);
}
match distribution {
Distribution::Global => {
// Nothing else is needed
}
Distribution::ShardedByField { .. } => {
todo!()
}
Distribution::ShardedImplicitly { .. } => {
// TODO: if primary key is not the first field or
// there's some space between key parts, we want
// bucket_id to go closer to the beginning of the tuple,
// but this will require to update primary key part
// indexes, so somebody should do that at some point.
let bucket_id_index = last_pk_part_index + 1;
format.insert(bucket_id_index as _, ("bucket_id", SFT::Unsigned).into());
let bucket_id_def = IndexDef {
id: 1,
name: "bucket_id".into(),
schema_version,
parts: vec![Part::field(bucket_id_index)
.field_type(IFT::Unsigned)
.is_nullable(false)],
operable: false,
unique: false,
// TODO: support other cases
local: true,
};
let res = self.storage.indexes.insert(&bucket_id_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(
Warning,
"failed creating index '{}': {e}",
bucket_id_def.name
);
}
}
}
let space_def = TableDef {
id,
name,
distribution,
schema_version,
format,
operable: false,
};
let res = self.storage.tables.insert(&space_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(Warning, "failed creating space '{}': {e}", space_def.name);
}
Ddl::CreateIndex {
space_id,
index_id,
by_fields,
} => {
let _ = (space_id, index_id, by_fields);
todo!();
}
Ddl::DropTable { id, .. } => {
ddl_meta_space_update_operable(&self.storage, id, false)
.expect("storage shouldn't fail");
Ddl::DropIndex { index_id, space_id } => {
let _ = (index_id, space_id);
todo!();
}
}
self.storage
.properties
.put(PropertyName::PendingSchemaChange, &ddl)?;
self.storage
.properties
.put(PropertyName::PendingSchemaVersion, &schema_version)?;

Georgy Moshkin
committed
self.storage
.properties
.put(PropertyName::NextSchemaVersion, &(schema_version + 1))?;
/// Is called during a transaction
fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
let mut latch_unlock = || {
if let Some(notify) = self.joint_state_latch.take() {
let _ = notify.send(Ok(()));
};
// Beware: a tiny difference in type names (`V2` or not `V2`)
// makes a significant difference in `entry.data` binary layout and
// in joint state transitions.
// `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be
// applied in an instant without entering the joint state.
let conf_state = match entry.entry_type {
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
latch_unlock();
self.raw_node.apply_conf_change(&cc).unwrap()
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch when either of conditions is met:
// - conf_change will leave the joint state;
// - or it will be applied without even entering one.
let leave_joint = cc.leave_joint() || cc.enter_joint().is_none();
if leave_joint {
latch_unlock();
}
// ConfChangeTransition::Auto implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
self.raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
};
self.raft_storage.persist_conf_state(&conf_state).unwrap();
fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
for rs in read_states {
if rs.request_ctx.is_empty() {
continue;
}
let ctx = crate::unwrap_ok_or!(
traft::EntryContextNormal::from_bytes(&rs.request_ctx),
tlog!(Error, "abnormal read_state: {e}"; "read_state" => ?rs);
continue;
}
if let Some(notify) = self.notifications.remove(&ctx.lc) {
notify.notify_ok(rs.index);
}
}
}
fn handle_messages(&mut self, messages: Vec<raft::Message>) {
if messages.is_empty() {
return;
}
self.main_loop_status("sending raft messages");
let mut sent_count = 0;
let mut skip_count = 0;
for msg in messages {
if msg.msg_type == raft::MessageType::MsgHeartbeat {
let instance_reachability = self.instance_reachability.borrow();
if !instance_reachability.should_send_heartbeat_this_tick(msg.to) {
skip_count += 1;
continue;
}
if let Err(e) = self.pool.send(msg) {
tlog!(Error, "{e}");
}
}
tlog!(
Debug,
"done sending messages, sent: {sent_count}, skipped: {skip_count}"
);
}
fn fetch_chunkwise_snapshot(
&self,
snapshot_data: &mut SnapshotData,
entry_id: RaftEntryId,
) -> traft::Result<()> {
#[rustfmt::skip]
let mut position = snapshot_data.next_chunk_position
.expect("shouldn't be None if this function is called");
let space_dumps = &mut snapshot_data.space_dumps;
#[cfg(debug_assertions)]
let mut last_space_id = 0;
#[cfg(debug_assertions)]
let mut last_space_tuple_count = 0;
loop {
self.main_loop_status("receiving snapshot");
let Some(leader_id) = self.status.get().leader_id else {
tlog!(
Warning,
"leader id is unknown while trying to request next snapshot chunk"
);
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
return Err(Error::LeaderUnknown);
};
#[cfg(debug_assertions)]
{
let last = space_dumps.last().expect("should not be empty");
if last.space_id != last_space_id {
last_space_tuple_count = 0;
}
last_space_id = last.space_id;
let mut tuples = last.tuples.as_ref();
let count = rmp::decode::read_array_len(&mut tuples)
.expect("space dump should contain a msgpack array");
last_space_tuple_count += count;
assert_eq!(last_space_id, position.space_id);
assert_eq!(last_space_tuple_count, position.tuple_offset);
}
let req = rpc::snapshot::Request { entry_id, position };
const SNAPSHOT_CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
tlog!(Debug, "requesting next snapshot chunk";
"entry_id" => %entry_id,
"position" => %position,
);
let fut = self
.pool
.call(&leader_id, &req, SNAPSHOT_CHUNK_REQUEST_TIMEOUT);
let fut = unwrap_ok_or!(fut,
Err(e) => {
tlog!(Warning, "failed requesting next snapshot chunk: {e}");
self.main_loop_status("error when receiving snapshot");
fiber::sleep(MainLoop::TICK * 4);
continue;
}
);
let resp = fiber::block_on(fut);
let mut resp = unwrap_ok_or!(resp,
Err(e) => {
let msg = e.to_string();
if msg.contains("read view not available") {
tlog!(Warning, "aborting snapshot retrieval: {e}");
return Err(e);
}
tlog!(Warning, "failed requesting next snapshot chunk: {e}");
self.main_loop_status("error when receiving snapshot");
fiber::sleep(MainLoop::TICK * 4);
continue;
}
);
space_dumps.append(&mut resp.snapshot_data.space_dumps);
position = unwrap_some_or!(resp.snapshot_data.next_chunk_position, {
tlog!(Debug, "received final snapshot chunk");
break;
});
}
Ok(())
}
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
/// Prepare for applying the raft snapshot if it's not empty.
///
/// This includes:
/// - Verifying snapshot version against global & local ones;
/// - Waiting until tarantool replication proceeds if this is a read-only replica;
/// - Fetching the rest of the snashot chunks if the first one is not the only one;
fn prepare_for_snapshot(
&self,
snapshot: &raft::Snapshot,
) -> traft::Result<Option<SnapshotData>> {
if snapshot.is_empty() {
return Ok(None);
}
let snapshot_data = crate::unwrap_ok_or!(
SnapshotData::decode(snapshot.get_data()),
Err(e) => {
tlog!(Warning, "skipping snapshot, which failed to deserialize: {e}");
return Err(e.into());
}
);
let v_snapshot = snapshot_data.schema_version;
loop {
let v_local = local_schema_version().expect("storage souldn't fail");
let v_global = self
.storage
.properties
.global_schema_version()
.expect("storage shouldn't fail");
#[rustfmt::skip]
debug_assert!(v_global <= v_local, "global schema version is only ever increased after local");
#[rustfmt::skip]
debug_assert!(v_global <= v_snapshot, "global schema version updates are distributed via raft");
if v_local > v_snapshot {
tlog!(
Warning,
"skipping stale snapshot: local schema version: {}, snapshot schema version: {}",
v_local,
snapshot_data.schema_version,
);
return Ok(None);
}
if !self.is_readonly() {
// Replicaset leader applies the schema changes directly.
break;
}
if v_local == v_snapshot {
// Replicaset follower has synced schema with the leader,
// now global space dumps should be handled.
break;
}
tlog!(Debug, "v_local: {v_local}, v_snapshot: {v_snapshot}");
self.main_loop_status("awaiting replication");
// Replicaset follower needs to sync with leader via tarantool
// replication.
let timeout = MainLoop::TICK * 4;
fiber::sleep(timeout);
}
let mut snapshot_data = snapshot_data;
if snapshot_data.next_chunk_position.is_some() {
self.main_loop_status("receiving snapshot");
let entry_id = RaftEntryId {
index: snapshot.get_metadata().index,
term: snapshot.get_metadata().term,
};
if let Err(e) = self.fetch_chunkwise_snapshot(&mut snapshot_data, entry_id) {
// Error has been logged.
tlog!(Warning, "dropping snapshot data");
return Err(e);
}
}
Ok(Some(snapshot_data))
}
#[inline(always)]
fn main_loop_status(&self, status: &'static str) {
if self.status.get().main_loop_status == status {
return;
}
tlog!(Debug, "main_loop_status = '{status}'");
self.status
.send_modify(|s| s.main_loop_status = status)
.expect("status shouldn't ever be borrowed across yields");
/// Processes a so-called "ready state" of the [`raft::RawNode`].
///
/// This includes:
/// - Sending messages to other instances (raft nodes);
/// - Handling raft snapshot:
/// - Verifying & waiting until snapshot data can be applied
/// (see [`Self::prepare_for_snapshot`] for more details);
/// - Persisting snapshot metadata;
/// - Compacting the raft log;
/// - Restoring local storage contents from the snapshot;
/// - Persisting uncommitted entries;
/// - Persisting hard state (term, vote, commit);
/// - Applying committed entries;
/// - Notifying pending fibers;
/// - Waking up the governor loop, so that it can handle any global state
/// changes;
///
/// Returns an error if the instance was expelled from the cluster.
///
/// See also:
///
/// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
/// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
///
/// This function yields.
fn advance(&mut self) -> traft::Result<()> {
// Handle any unreachable nodes from previous iteration.
let unreachables = self
.instance_reachability
.borrow_mut()
.take_unreachables_to_report();
for raft_id in unreachables {
self.raw_node.report_unreachable(raft_id);
// TODO: remove infos when instances are expelled.
let Some(pr) = self.raw_node.raft.mut_prs().get_mut(raft_id) else {
continue;
};
// NOTE: Raft-rs will not check if the node should be paused until
// a new raft entry is appended to the log. This means that once an
// instance goes silent it will still be bombarded with heartbeats
// until someone proposes an operation. This is a workaround for
// that particular case.
// The istance's state would've only changed if it was not in the
// Snapshot state, so we have to check for that.
if pr.state == ::raft::ProgressState::Probe {
pr.pause();
}
}
// Get the `Ready` with `RawNode::ready` interface.
if !self.raw_node.has_ready() {
return Ok(());
let mut expelled = false;
let mut ready: raft::Ready = self.raw_node.ready();
// Apply soft state changes before anything else, so that this info is
// available for other fibers as soon as main loop yields.
if let Some(ss) = ready.ss() {
self.status
.send_modify(|s| {
s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
s.raft_state = ss.raft_state.into();
})
.expect("status shouldn't ever be borrowed across yields");
}
// These messages are only available on leader. Send them out ASAP.
self.handle_messages(ready.take_messages());
// Handle read states before applying snapshot which may fail.
self.handle_read_states(ready.read_states());
// Raft snapshot has arrived, check if we need to apply it.
let snapshot = ready.snapshot();
let Ok(snapshot_data) = self.prepare_for_snapshot(snapshot) else {
// Error was already logged
return Ok(());
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
// Persist stuff raft wants us to persist.
let hard_state = ready.hs();
let entries_to_persist = ready.entries();
if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() {
let mut new_term = None;
let mut new_applied = None;
if let Err(e) = transaction(|| -> Result<(), Error> {
self.main_loop_status("persisting hard state, entries and/or snapshot");
// Raft HardState changed, and we need to persist it.
if let Some(hard_state) = hard_state {
tlog!(Debug, "hard state: {hard_state:?}");
self.raft_storage.persist_hard_state(hard_state)?;
new_term = Some(hard_state.term);
}
// Persist uncommitted entries in the raft log.
if !entries_to_persist.is_empty() {
#[rustfmt::skip]
debug_assert!(snapshot.is_empty(), "can't have both the snapshot & log entries");
self.raft_storage.persist_entries(entries_to_persist)?;
}
if let Some(snapshot_data) = snapshot_data {
#[rustfmt::skip]
debug_assert!(entries_to_persist.is_empty(), "can't have both the snapshot & log entries");
// Persist snapshot metadata and compact the raft log if it wasn't empty.
let meta = snapshot.get_metadata();
self.raft_storage.handle_snapshot_metadata(meta)?;
new_applied = Some(meta.index);
// Persist the contents of the global tables from the snapshot data.
let is_master = !self.is_readonly();
self.storage
.apply_snapshot_data(&snapshot_data, is_master)?;
// TODO: As long as the snapshot was sent to us in response to
// a rejected MsgAppend (which is the only possible case
// currently), we will send a MsgAppendResponse back which will
// automatically reset our status from Snapshot to Replicate.
// But when we implement support for manual snapshot requests,
// we will have to also implement sending a MsgSnapStatus,
// to reset out status explicitly to avoid leader ignoring us
// indefinitely after that point.
}
Ok(())
}) {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}");
if let Some(new_term) = new_term {
.send_modify(|s| s.term = new_term)
.expect("status shouldn't ever be borrowed across yields");
if let Some(new_applied) = new_applied {
// handle_snapshot_metadata persists applied index, so we update the watch channel
self.applied
.send(new_applied)
.expect("applied shouldn't ever be borrowed across yields");
}
if hard_state.is_some() {
crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE");
}
if !entries_to_persist.is_empty() {
crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_ENTRIES");
}
}
// Apply committed entries.
let committed_entries = ready.committed_entries();
if !committed_entries.is_empty() {
let res = self.handle_committed_entries(committed_entries, &mut expelled);
if let Err(e) = res {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}");
}
crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
// These messages are only available on followers. They must be sent only
// AFTER the HardState, Entries and Snapshot are persisted
// to the stable storage.
self.handle_messages(ready.take_persisted_messages());
// Advance the Raft. Make it know, that the necessary entries have been persisted.
// If this is a leader, it may commit some of the newly persisted entries.
let mut light_rd = self.raw_node.advance(ready);
// Send new message ASAP. (Only on leader)
let messages = light_rd.take_messages();
if !messages.is_empty() {
debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);
self.handle_messages(messages);
}
// Update commit index. (Only on leader)
if let Some(commit) = light_rd.commit_index() {
debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);
if let Err(e) = transaction(|| -> Result<(), Error> {
self.main_loop_status("persisting commit index");
tlog!(Debug, "commit index: {}", commit);
self.raft_storage.persist_commit(commit)?;
Ok(())
}) {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}");
}
crate::error_injection!(block "BLOCK_AFTER_RAFT_PERSISTS_COMMIT_INDEX");
// Apply committed entries.
// These are probably entries which we've just persisted.
let committed_entries = light_rd.committed_entries();
if !committed_entries.is_empty() {
let res = self.handle_committed_entries(committed_entries, &mut expelled);
if let Err(e) = res {
panic!("transaction failed: {e}");
}
crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
// Advance the apply index.
self.raw_node.advance_apply();
self.main_loop_status("idle");
if expelled {
return Err(Error::Expelled);
}
Ok(())
/// Check if this is a read only replica. This function is called when we
/// need to determine if this instance should be changing the schema
/// definition or if it should instead synchronize with a master.
///
/// Note: it would be a little more reliable to check if the replica is
/// chosen to be a master by checking master_id in _pico_replicaset, but
/// currently we cannot do that, because tarantool replication is being
/// done asynchronously with raft log replication. Basically instance needs
/// to know it's a replicaset master before it can access the replicaset
/// info.
fn is_readonly(&self) -> bool {
let is_ro: bool = crate::tarantool::eval("return box.info.ro")
.expect("checking read-onlyness should never fail");
is_ro
#[inline]
fn cleanup_notifications(&mut self) {
self.notifications.retain(|_, notify| !notify.is_closed());
}
/// Generates a pair of logical clock and a notification channel.
/// Logical clock is a unique identifier suitable for tagging
/// entries in raft log. Notification is broadcasted when the
/// corresponding entry is committed.
#[inline]
fn schedule_notification(&mut self) -> (LogicalClock, Notify) {
let (tx, rx) = notification();
let lc = {
self.lc.inc();
self.lc
};
self.notifications.insert(lc, tx);
(lc, rx)
}
/// Return value of [`NodeImpl::handle_committed_normal_entry`], explains what should be
/// done as result of attempting to apply a given entry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ApplyEntryResult {
/// This entry failed to apply for some reason, and must be retried later.
SleepAndRetry,
/// Entry applied successfully, proceed to next entry.
EntryApplied,
}
pub(crate) struct MainLoop {
_loop: Option<fiber::JoinHandle<'static, ()>>,
loop_waker: watch::Sender<()>,
stop_flag: Rc<Cell<bool>>,
}
struct MainLoopState {
loop_waker: watch::Receiver<()>,
stop_flag: Rc<Cell<bool>>,
}
impl MainLoop {
pub const TICK: Duration = Duration::from_millis(100);
fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self {
let (loop_waker_tx, loop_waker_rx) = watch::channel(());
let stop_flag: Rc<Cell<bool>> = Default::default();
let state = MainLoopState {
node_impl,
loop_waker: loop_waker_rx,
stop_flag: stop_flag.clone(),
};
Self {
_loop: loop_start!("raft_main_loop", Self::iter_fn, state),
loop_waker: loop_waker_tx,
stop_flag,
}
}
pub fn wakeup(&self) {
let _ = self.loop_waker.send(());
async fn iter_fn(state: &mut MainLoopState) -> FlowControl {
let _ = state.loop_waker.changed().timeout(Self::TICK).await;
if state.stop_flag.take() {
return FlowControl::Break;
}
// FIXME: potential deadlock - can't use sync mutex in async fn
let mut node_impl = state.node_impl.lock(); // yields
if state.stop_flag.take() {
return FlowControl::Break;
}
node_impl.cleanup_notifications();
state.next_tick = now.saturating_add(Self::TICK);
let res = node_impl.advance(); // yields
drop(node_impl);
if state.stop_flag.take() {
return FlowControl::Break;
}
match res {
Err(e @ Error::Expelled) => {
tlog!(Info, "{e}, shutting down");
crate::tarantool::exit(0);
}
Err(e) => {
tlog!(Error, "error during raft main loop iteration: {e}");
Ok(()) => {}
FlowControl::Continue
}
}
impl Drop for MainLoop {
fn drop(&mut self) {
self.stop_flag.set(true);
let _ = self.loop_waker.send(());
pub fn global() -> traft::Result<&'static Node> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> {
node.step_and_yield(raft::Message::try_from(pb).map_err(Error::other)?);