diff --git a/src/governor/mod.rs b/src/governor/mod.rs index bbda23834cb40bd72710d3c7b69fd79630dbabaa..ab8311e6b90ef1cc111652b5f8c7976f342a5e2a 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -38,15 +38,17 @@ impl Loop { async fn iter_fn( State { + governor_status, storage, raft_storage, - status, + raft_status, waker, pool, }: &mut State, ) -> FlowControl { - if !status.get().raft_state.is_leader() { - status.changed().await.unwrap(); + if !raft_status.get().raft_state.is_leader() { + set_status(governor_status, "not a leader"); + raft_status.changed().await.unwrap(); return Continue; } @@ -87,7 +89,7 @@ impl Loop { .collect(); let tiers: HashMap<_, _> = tiers.iter().map(|tier| (&tier.name, tier)).collect(); - let term = status.get().term; + let term = raft_status.get().term; let applied = raft_storage.applied().expect("storage should never fail"); let cluster_id = raft_storage .cluster_id() @@ -148,6 +150,7 @@ impl Loop { match plan { Plan::ConfChange(ConfChange { conf_change }) => { + set_status(governor_status, "conf change"); // main_loop gives the warranty that every ProposeConfChange // will sometimes be handled and there's no need in timeout. // It also guarantees that the notification will arrive only @@ -160,12 +163,14 @@ impl Loop { } Plan::TransferLeadership(TransferLeadership { to }) => { + set_status(governor_status, "transfer raft leader"); tlog!(Info, "transferring leadership to {}", to.instance_id); node.transfer_leadership_and_yield(to.raft_id); _ = waker.changed().timeout(Loop::RETRY_TIMEOUT).await; } Plan::UpdateTargetReplicasetMaster(UpdateTargetReplicasetMaster { op }) => { + set_status(governor_status, "update target replication leader"); governor_step! { "proposing replicaset target master change" async { @@ -182,6 +187,7 @@ impl Loop { replicaset_id, op, }) => { + set_status(governor_status, "transfer replication leader"); tlog!( Info, "transferring replicaset mastership from {old_master_id} to {new_master_id}" @@ -227,6 +233,7 @@ impl Loop { } Plan::Downgrade(Downgrade { req }) => { + set_status(governor_status, "update instance grade to offline"); tlog!(Info, "downgrading instance {}", req.instance_id); let instance_id = req.instance_id.clone(); @@ -248,6 +255,7 @@ impl Loop { rpc, op, }) => { + set_status(governor_status, "create new replicaset"); governor_step! { "promoting new replicaset master" [ "master_id" => %master_id, @@ -276,6 +284,7 @@ impl Loop { replicaset_peers, req, }) => { + set_status(governor_status, "configure replication"); governor_step! { "configuring replication" async { @@ -325,6 +334,7 @@ impl Loop { } Plan::ShardingInit(ShardingInit { targets, rpc, req }) => { + set_status(governor_status, "configure sharding"); governor_step! { "configuring sharding" async { @@ -368,6 +378,7 @@ impl Loop { } Plan::ShardingBoot(ShardingBoot { target, rpc, op }) => { + set_status(governor_status, "bootstrap bucket distribution"); governor_step! { "bootstrapping bucket distribution" [ "instance_id" => %target, @@ -383,6 +394,7 @@ impl Loop { } Plan::ProposeReplicasetStateChanges(ProposeReplicasetStateChanges { op }) => { + set_status(governor_status, "update replicaset state"); governor_step! { "proposing replicaset state change" async { @@ -391,21 +403,8 @@ impl Loop { } } - Plan::SkipSharding(SkipSharding { req }) => { - let instance_id = req.instance_id.clone(); - let current_grade = req.current_grade.expect("must be set"); - governor_step! { - "handling instance grade change" [ - "instance_id" => %instance_id, - "current_grade" => %current_grade, - ] - async { - handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)? - } - } - } - Plan::ToOnline(ToOnline { target, rpc, req }) => { + set_status(governor_status, "update instance grade to online"); if let Some(rpc) = rpc { governor_step! { "updating sharding config" [ @@ -431,6 +430,7 @@ impl Loop { } Plan::ApplySchemaChange(ApplySchemaChange { targets, rpc }) => { + set_status(governor_status, "apply clusterwide schema change"); let mut next_op = Op::Nop; governor_step! { "applying pending schema change" @@ -498,6 +498,7 @@ impl Loop { } Plan::UpdateTargetVshardConfig(UpdateTargetVshardConfig { dml }) => { + set_status(governor_status, "update target sharding configuration"); governor_step! { "updating target vshard config" async { @@ -507,6 +508,7 @@ impl Loop { } Plan::UpdateCurrentVshardConfig(UpdateCurrentVshardConfig { targets, rpc, dml }) => { + set_status(governor_status, "update current sharding configuration"); governor_step! { "applying vshard config changes" async { @@ -537,6 +539,7 @@ impl Loop { } Plan::None => { + set_status(governor_status, "idle"); tlog!(Info, "nothing to do, waiting for events to handle"); waker.mark_seen(); _ = waker.changed().await; @@ -548,16 +551,20 @@ impl Loop { pub fn start( pool: Rc<ConnectionPool>, - status: watch::Receiver<Status>, + raft_status: watch::Receiver<Status>, storage: Clusterwide, raft_storage: RaftSpaceAccess, ) -> Self { let (waker_tx, waker_rx) = watch::channel(()); + let (governor_status_tx, governor_status_rx) = watch::channel(GovernorStatus { + governor_loop_status: "initializing", + }); let state = State { + governor_status: governor_status_tx, storage, raft_storage, - status, + raft_status, waker: waker_rx, pool, }; @@ -565,6 +572,7 @@ impl Loop { Self { _loop: crate::loop_start!("governor_loop", Self::iter_fn, state), waker: waker_tx, + status: governor_status_rx, } } @@ -573,15 +581,41 @@ impl Loop { } } +#[inline(always)] +fn set_status(status: &mut watch::Sender<GovernorStatus>, msg: &'static str) { + if status.get().governor_loop_status == msg { + return; + } + tlog!(Debug, "governor_loop_status = '{msg}'"); + status + .send_modify(|s| s.governor_loop_status = msg) + .expect("status shouldn't ever be borrowed across yields"); +} + pub struct Loop { _loop: Option<fiber::JoinHandle<'static, ()>>, waker: watch::Sender<()>, + + /// Current status of governor loop. + /// + // XXX: maybe this shouldn't be a watch::Receiver, but it's not much worse + // than a Rc, so ... + pub status: watch::Receiver<GovernorStatus>, } struct State { + governor_status: watch::Sender<GovernorStatus>, storage: Clusterwide, raft_storage: RaftSpaceAccess, - status: watch::Receiver<Status>, + raft_status: watch::Receiver<Status>, waker: watch::Receiver<()>, pool: Rc<ConnectionPool>, } + +#[derive(Debug, Clone, Copy)] +pub struct GovernorStatus { + /// Current state of the governor loop. + /// + /// Is set by governor to explain the reason why it has yielded. + pub governor_loop_status: &'static str, +} diff --git a/src/governor/plan.rs b/src/governor/plan.rs index 2326ad1659835bf80edfcc633cbe1f9e5c83d8e2..1ca07a561f2e1704719a501d72c3f82270ece14c 100644 --- a/src/governor/plan.rs +++ b/src/governor/plan.rs @@ -475,10 +475,6 @@ pub mod stage { pub req: rpc::update_instance::Request, } - pub struct SkipSharding { - pub req: rpc::update_instance::Request, - } - pub struct ShardingBoot<'i> { pub target: &'i InstanceId, pub rpc: rpc::sharding::bootstrap::Request, diff --git a/src/luamod.rs b/src/luamod.rs index 9fdec1f23c9702b3c81a126b05364d166d24c362..2797e494b8ac560365547504d3b25454bd95d762 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -281,6 +281,31 @@ pub(crate) fn setup(args: &args::Run) { tlua::function0(|| traft::node::global().map(|n| n.status())), ); + /////////////////////////////////////////////////////////////////////////// + // FIXME: too many statuses + luamod_set( + &l, + "_governor_loop_status", + indoc! {" + pico._governor_loop_status() + ================== + + Returns the governor loop status. + + Returns: + + (string) + or + (nil, string) in case of an error, if the raft node is + not initialized yet + + "}, + tlua::Function::new(|| -> traft::Result<&'static str> { + let node = traft::node::global()?; + Ok(node.governor_loop.status.get().governor_loop_status) + }), + ); + luamod_set( &l, "raft_tick",